HBASE-7204 Make hbck ErrorReporter pluggable

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1415871 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2012-11-30 22:16:51 +00:00
parent ac3c5eb6fa
commit 51dc20638e
2 changed files with 150 additions and 53 deletions

View File

@ -48,6 +48,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -84,8 +85,8 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
@ -95,6 +96,9 @@ import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import com.google.common.base.Joiner;
@ -151,7 +155,7 @@ import com.google.protobuf.ServiceException;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HBaseFsck {
public class HBaseFsck extends Configured implements Tool {
public static final long DEFAULT_TIME_LAG = 60000; // default value of 1 minute
public static final long DEFAULT_SLEEP_BEFORE_RERUN = 10000;
private static final int MAX_NUM_THREADS = 50; // #threads to contact regions
@ -164,7 +168,6 @@ public class HBaseFsck {
* Internal resources
**********************/
private static final Log LOG = LogFactory.getLog(HBaseFsck.class.getName());
private Configuration conf;
private ClusterStatus status;
private HConnection connection;
private HBaseAdmin admin;
@ -205,7 +208,7 @@ public class HBaseFsck {
/*********
* State
*********/
private ErrorReporter errors = new PrintingErrorReporter();
final private ErrorReporter errors;
int fixes = 0;
/**
@ -246,8 +249,9 @@ public class HBaseFsck {
* @throws ZooKeeperConnectionException if unable to connect to ZooKeeper
*/
public HBaseFsck(Configuration conf) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException {
this.conf = conf;
ZooKeeperConnectionException, IOException, ClassNotFoundException {
super(conf);
errors = getErrorReporter(conf);
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
executor = new ScheduledThreadPoolExecutor(numThreads);
@ -264,8 +268,9 @@ public class HBaseFsck {
* if unable to connect to ZooKeeper
*/
public HBaseFsck(Configuration conf, ExecutorService exec) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException {
this.conf = conf;
ZooKeeperConnectionException, IOException, ClassNotFoundException {
super(conf);
errors = getErrorReporter(getConf());
this.executor = exec;
}
@ -274,8 +279,8 @@ public class HBaseFsck {
* online state.
*/
public void connect() throws IOException {
admin = new HBaseAdmin(conf);
meta = new HTable(conf, HConstants.META_TABLE_NAME);
admin = new HBaseAdmin(getConf());
meta = new HTable(getConf(), HConstants.META_TABLE_NAME);
status = admin.getClusterStatus();
connection = admin.getConnection();
}
@ -343,7 +348,7 @@ public class HBaseFsck {
|| shouldFixHdfsOverlaps() || shouldFixTableOrphans())) {
LOG.info("Loading regioninfos HDFS");
// if nothing is happening this should always complete in two iterations.
int maxIterations = conf.getInt("hbase.hbck.integrityrepair.iterations.max", 3);
int maxIterations = getConf().getInt("hbase.hbck.integrityrepair.iterations.max", 3);
int curIter = 0;
do {
clearState(); // clears hbck state and reset fixes to 0 and.
@ -463,7 +468,7 @@ public class HBaseFsck {
*/
private void adoptHdfsOrphan(HbckInfo hi) throws IOException {
Path p = hi.getHdfsRegionDir();
FileSystem fs = p.getFileSystem(conf);
FileSystem fs = p.getFileSystem(getConf());
FileStatus[] dirs = fs.listStatus(p);
if (dirs == null) {
LOG.warn("Attempt to adopt ophan hdfs region skipped becuase no files present in " +
@ -488,7 +493,7 @@ public class HBaseFsck {
byte[] start, end;
HFile.Reader hf = null;
try {
CacheConfig cacheConf = new CacheConfig(conf);
CacheConfig cacheConf = new CacheConfig(getConf());
hf = HFile.createReader(fs, hfile.getPath(), cacheConf);
hf.loadFileInfo();
KeyValue startKv = KeyValue.createKeyValueFromKey(hf.getFirstKey());
@ -536,7 +541,7 @@ public class HBaseFsck {
// create new region on hdfs. move data into place.
HRegionInfo hri = new HRegionInfo(template.getName(), orphanRegionRange.getFirst(), orphanRegionRange.getSecond());
LOG.info("Creating new region : " + hri);
HRegion region = HBaseFsckRepair.createHDFSRegionDir(conf, hri, template);
HRegion region = HBaseFsckRepair.createHDFSRegionDir(getConf(), hri, template);
Path target = region.getRegionDir();
// rename all the data to new region
@ -646,7 +651,7 @@ public class HBaseFsck {
// already loaded data
return;
}
HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(FileSystem.get(this.conf), regionDir);
HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(FileSystem.get(getConf()), regionDir);
LOG.debug("HRegionInfo read: " + hri.toString());
hbi.hdfsEntry.hri = hri;
}
@ -716,11 +721,11 @@ public class HBaseFsck {
if (modTInfo == null) {
// only executed once per table.
modTInfo = new TableInfo(tableName);
Path hbaseRoot = FSUtils.getRootDir(conf);
Path hbaseRoot = FSUtils.getRootDir(getConf());
tablesInfo.put(tableName, modTInfo);
try {
HTableDescriptor htd =
FSTableDescriptors.getTableDescriptor(hbaseRoot.getFileSystem(conf),
FSTableDescriptors.getTableDescriptor(hbaseRoot.getFileSystem(getConf()),
hbaseRoot, tableName);
modTInfo.htds.add(htd);
} catch (IOException ioe) {
@ -749,7 +754,7 @@ public class HBaseFsck {
*/
private Set<String> getColumnFamilyList(Set<String> columns, HbckInfo hbi) throws IOException {
Path regionDir = hbi.getHdfsRegionDir();
FileSystem fs = regionDir.getFileSystem(conf);
FileSystem fs = regionDir.getFileSystem(getConf());
FileStatus[] subDirs = fs.listStatus(regionDir, new FSUtils.FamilyDirFilter(fs));
for (FileStatus subdir : subDirs) {
String columnfamily = subdir.getPath().getName();
@ -772,7 +777,7 @@ public class HBaseFsck {
for (String columnfamimly : columns) {
htd.addFamily(new HColumnDescriptor(columnfamimly));
}
FSTableDescriptors.createTableDescriptor(htd, conf, true);
FSTableDescriptors.createTableDescriptor(htd, getConf(), true);
return true;
}
@ -788,12 +793,12 @@ public class HBaseFsck {
public void fixOrphanTables() throws IOException {
if (shouldFixTableOrphans() && !orphanTableDirs.isEmpty()) {
Path hbaseRoot = FSUtils.getRootDir(conf);
Path hbaseRoot = FSUtils.getRootDir(getConf());
List<String> tmpList = new ArrayList<String>();
tmpList.addAll(orphanTableDirs.keySet());
HTableDescriptor[] htds = getHTableDescriptors(tmpList);
Iterator<Entry<String, Set<String>>> iter = orphanTableDirs.entrySet().iterator();
int j = 0;
int j = 0;
int numFailedCase = 0;
while (iter.hasNext()) {
Entry<String, Set<String>> entry = (Entry<String, Set<String>>) iter.next();
@ -804,7 +809,7 @@ public class HBaseFsck {
HTableDescriptor htd = htds[j];
LOG.info("fixing orphan table: " + tableName + " from cache");
FSTableDescriptors.createTableDescriptor(
hbaseRoot.getFileSystem(conf), hbaseRoot, htd, true);
hbaseRoot.getFileSystem(getConf()), hbaseRoot, htd, true);
j++;
iter.remove();
}
@ -843,8 +848,8 @@ public class HBaseFsck {
* @return an open .META. HRegion
*/
private HRegion createNewRootAndMeta() throws IOException {
Path rootdir = new Path(conf.get(HConstants.HBASE_DIR));
Configuration c = conf;
Path rootdir = new Path(getConf().get(HConstants.HBASE_DIR));
Configuration c = getConf();
HRegionInfo rootHRI = new HRegionInfo(HRegionInfo.ROOT_REGIONINFO);
MasterFileSystem.setInfoFamilyCachingForRoot(false);
HRegionInfo metaHRI = new HRegionInfo(HRegionInfo.FIRST_META_REGIONINFO);
@ -983,7 +988,7 @@ public class HBaseFsck {
for (TableInfo tInfo : tablesInfo.values()) {
TableIntegrityErrorHandler handler;
if (fixHoles || fixOverlaps) {
handler = tInfo.new HDFSIntegrityFixer(tInfo, errors, conf,
handler = tInfo.new HDFSIntegrityFixer(tInfo, errors, getConf(),
fixHoles, fixOverlaps);
} else {
handler = tInfo.new IntegrityFixSuggester(tInfo, errors);
@ -998,7 +1003,7 @@ public class HBaseFsck {
private Path getSidelineDir() throws IOException {
if (sidelineDir == null) {
Path hbaseDir = FSUtils.getRootDir(conf);
Path hbaseDir = FSUtils.getRootDir(getConf());
Path hbckDir = new Path(hbaseDir, HConstants.HBCK_SIDELINEDIR_NAME);
sidelineDir = new Path(hbckDir, hbaseDir.getName() + "-"
+ startMillis);
@ -1115,8 +1120,8 @@ public class HBaseFsck {
*/
Path sidelineOldRootAndMeta() throws IOException {
// put current -ROOT- and .META. aside.
Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = hbaseDir.getFileSystem(conf);
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
FileSystem fs = hbaseDir.getFileSystem(getConf());
Path backupDir = getSidelineDir();
fs.mkdirs(backupDir);
@ -1148,7 +1153,7 @@ public class HBaseFsck {
*/
private void loadDisabledTables()
throws ZooKeeperConnectionException, IOException {
HConnectionManager.execute(new HConnectable<Void>(conf) {
HConnectionManager.execute(new HConnectable<Void>(getConf()) {
@Override
public Void connect(HConnection connection) throws IOException {
ZooKeeperWatcher zkw = createZooKeeperWatcher();
@ -1178,8 +1183,8 @@ public class HBaseFsck {
* regionInfoMap
*/
public void loadHdfsRegionDirs() throws IOException, InterruptedException {
Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(conf);
Path rootDir = new Path(getConf().get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(getConf());
// list all tables from HDFS
List<FileStatus> tableDirs = Lists.newArrayList();
@ -1207,8 +1212,8 @@ public class HBaseFsck {
LOG.info("Trying to create a new " + HConstants.VERSION_FILE_NAME
+ " file.");
setShouldRerun();
FSUtils.setVersion(fs, rootDir, conf.getInt(
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000), conf.getInt(
FSUtils.setVersion(fs, rootDir, getConf().getInt(
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000), getConf().getInt(
HConstants.VERSION_FILE_WRITE_ATTEMPTS,
HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
}
@ -1265,7 +1270,7 @@ public class HBaseFsck {
}
private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
return new ZooKeeperWatcher(conf, "hbase Fsck", new Abortable() {
return new ZooKeeperWatcher(getConf(), "hbase Fsck", new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.error(why, e);
@ -1337,8 +1342,8 @@ public class HBaseFsck {
return;
}
Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = hbaseDir.getFileSystem(conf);
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
FileSystem fs = hbaseDir.getFileSystem(getConf());
UserGroupInformation ugi = User.getCurrent().getUGI();
FileStatus[] files = fs.listStatus(hbaseDir);
for (FileStatus file : files) {
@ -1558,7 +1563,7 @@ public class HBaseFsck {
}
LOG.info("Patching .META. with .regioninfo: " + hbi.getHdfsHRI());
HBaseFsckRepair.fixMetaHoleOnline(conf, hbi.getHdfsHRI());
HBaseFsckRepair.fixMetaHoleOnline(getConf(), hbi.getHdfsHRI());
tryAssignmentRepair(hbi, "Trying to reassign region...");
}
@ -1574,7 +1579,7 @@ public class HBaseFsck {
}
LOG.info("Patching .META. with with .regioninfo: " + hbi.getHdfsHRI());
HBaseFsckRepair.fixMetaHoleOnline(conf, hbi.getHdfsHRI());
HBaseFsckRepair.fixMetaHoleOnline(getConf(), hbi.getHdfsHRI());
tryAssignmentRepair(hbi, "Trying to fix unassigned region...");
}
@ -1736,7 +1741,7 @@ public class HBaseFsck {
debugLsr(contained.getHdfsRegionDir());
// rename the contained into the container.
FileSystem fs = targetRegionDir.getFileSystem(conf);
FileSystem fs = targetRegionDir.getFileSystem(getConf());
FileStatus[] dirs = fs.listStatus(contained.getHdfsRegionDir());
if (dirs == null) {
@ -2351,7 +2356,7 @@ public class HBaseFsck {
HTableDescriptor[] htd = new HTableDescriptor[0];
try {
LOG.info("getHTableDescriptors == tableNames => " + tableNames);
htd = new HBaseAdmin(conf).getTableDescriptors(tableNames);
htd = new HBaseAdmin(getConf()).getTableDescriptors(tableNames);
} catch (IOException e) {
LOG.debug("Exception getting table descriptors", e);
}
@ -2494,12 +2499,12 @@ public class HBaseFsck {
};
// Scan -ROOT- to pick up META regions
MetaScanner.metaScan(conf, visitor, null, null,
MetaScanner.metaScan(getConf(), visitor, null, null,
Integer.MAX_VALUE, HConstants.ROOT_TABLE_NAME);
if (!checkMetaOnly) {
// Scan .META. to pick up user regions
MetaScanner.metaScan(conf, visitor);
MetaScanner.metaScan(getConf(), visitor);
}
errors.print("");
@ -2753,6 +2758,12 @@ public class HBaseFsck {
}
}
private static ErrorReporter getErrorReporter(
final Configuration conf) throws ClassNotFoundException {
Class<? extends ErrorReporter> reporter = conf.getClass("hbasefsck.errorreporter", PrintingErrorReporter.class, ErrorReporter.class);
return (ErrorReporter)ReflectionUtils.newInstance(reporter, conf);
}
public interface ErrorReporter {
public static enum ERROR_CODE {
UNKNOWN, NO_META_REGION, NULL_ROOT_REGION, NO_VERSION_FILE, NOT_IN_META_HDFS, NOT_IN_META,
@ -3254,7 +3265,7 @@ public class HBaseFsck {
}
protected HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles);
return new HFileCorruptionChecker(getConf(), executor, sidelineCorruptHFiles);
}
public HFileCorruptionChecker getHFilecorruptionChecker() {
@ -3327,7 +3338,6 @@ public class HBaseFsck {
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// create a fsck object
Configuration conf = HBaseConfiguration.create();
Path hbasedir = new Path(conf.get(HConstants.HBASE_DIR));
@ -3335,12 +3345,14 @@ public class HBaseFsck {
conf.set("fs.defaultFS", defaultFs.toString()); // for hadoop 0.21+
conf.set("fs.default.name", defaultFs.toString()); // for hadoop 0.20
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
ExecutorService exec = new ScheduledThreadPoolExecutor(numThreads);
HBaseFsck hbck = new HBaseFsck(conf, exec);
hbck.exec(exec, args);
int retcode = hbck.getRetCode();
Runtime.getRuntime().exit(retcode);
int ret = ToolRunner.run(new HBaseFsck(conf), args);
System.exit(ret);
}
@Override
public int run(String[] args) throws Exception {
exec(executor, args);
return getRetCode();
}
public HBaseFsck exec(ExecutorService exec, String[] args) throws KeeperException, IOException,
@ -3499,13 +3511,13 @@ public class HBaseFsck {
setHFileCorruptionChecker(hfcc); // so we can get result
Collection<String> tables = getIncludedTables();
Collection<Path> tableDirs = new ArrayList<Path>();
Path rootdir = FSUtils.getRootDir(conf);
Path rootdir = FSUtils.getRootDir(getConf());
if (tables.size() > 0) {
for (String t : tables) {
tableDirs.add(FSUtils.getTablePath(rootdir, t));
}
} else {
tableDirs = FSUtils.getTableDirs(FSUtils.getCurrentFileSystem(conf), rootdir);
tableDirs = FSUtils.getTableDirs(FSUtils.getCurrentFileSystem(getConf()), rootdir);
}
hfcc.checkTables(tableDirs);
PrintWriter out = new PrintWriter(System.out);
@ -3545,7 +3557,7 @@ public class HBaseFsck {
* ls -r for debugging purposes
*/
void debugLsr(Path p) throws IOException {
debugLsr(conf, p);
debugLsr(getConf(), p);
}
/**

View File

@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hbase.util.HBaseFsck.TableInfo;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.HBaseFsck.HbckInfo;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
@ -1660,6 +1662,89 @@ public class TestHBaseFsck {
doQuarantineTest(table, hbck, 3, 0, 0, 0, 1);
}
/**
* Test pluggable error reporter. It can be plugged in
* from system property or configuration.
*/
@Test
public void testErrorReporter() throws Exception {
try {
MockErrorReporter.calledCount = 0;
doFsck(conf, false);
assertEquals(MockErrorReporter.calledCount, 0);
conf.set("hbasefsck.errorreporter", MockErrorReporter.class.getName());
doFsck(conf, false);
assertTrue(MockErrorReporter.calledCount > 20);
} finally {
conf.set("hbasefsck.errorreporter", "");
MockErrorReporter.calledCount = 0;
}
}
static class MockErrorReporter implements ErrorReporter {
static int calledCount = 0;
public void clear() {
calledCount++;
}
public void report(String message) {
calledCount++;
}
public void reportError(String message) {
calledCount++;
}
public void reportError(ERROR_CODE errorCode, String message) {
calledCount++;
}
public void reportError(ERROR_CODE errorCode, String message, TableInfo table) {
calledCount++;
}
public void reportError(ERROR_CODE errorCode,
String message, TableInfo table, HbckInfo info) {
calledCount++;
}
public void reportError(ERROR_CODE errorCode, String message,
TableInfo table, HbckInfo info1, HbckInfo info2) {
calledCount++;
}
public int summarize() {
return ++calledCount;
}
public void detail(String details) {
calledCount++;
}
public ArrayList<ERROR_CODE> getErrorList() {
calledCount++;
return new ArrayList<ERROR_CODE>();
}
public void progress() {
calledCount++;
}
public void print(String message) {
calledCount++;
}
public void resetErrors() {
calledCount++;
}
public boolean tableHasErrors(TableInfo table) {
calledCount++;
return false;
}
}
@org.junit.Rule
public TestName name = new TestName();