HBASE-6586 Quarantine Corrupted HFiles with hbck

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1377941 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2012-08-28 01:55:21 +00:00
parent 93e03d7d09
commit f63e6a41a5
13 changed files with 1026 additions and 92 deletions

View File

@ -19,8 +19,6 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -30,7 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class DoNotRetryIOException extends IOException {
public class DoNotRetryIOException extends HBaseIOException {
private static final long serialVersionUID = 1197446454511704139L;

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* All hbase specific IOExceptions should be subclasses of HBaseIOException
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HBaseIOException extends IOException {
private static final long serialVersionUID = 1L;
public HBaseIOException() {
super();
}
/**
* {@inheritDoc}
*/
public HBaseIOException(String message) {
super(message);
}
/**
* {@inheritDoc}
**/
public HBaseIOException(String message, Throwable cause) {
super(message, cause);
}
/**
* {@inheritDoc}
*/
public HBaseIOException(Throwable cause) {
super(cause);
}}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DoNotRetryIOException;
/**
* This exception is thrown when attempts to read an HFile fail due to corruption or truncation
* issues.
*/
@InterfaceAudience.Private
public class CorruptHFileException extends DoNotRetryIOException {
private static final long serialVersionUID = 1L;
CorruptHFileException(String m, Throwable t) {
super(m, t);
}
CorruptHFileException(String m) {
super(m);
}
}

View File

@ -19,6 +19,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@ -34,9 +37,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.RawComparator;
import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
import com.google.common.io.NullOutputStream;
/**
@ -322,12 +322,7 @@ public class FixedFileTrailer {
int majorVersion = extractMajorVersion(version);
int minorVersion = extractMinorVersion(version);
try {
HFile.checkFormatVersion(majorVersion);
} catch (IllegalArgumentException iae) {
// In this context, an invalid version might indicate a corrupt HFile.
throw new IOException(iae);
}
HFile.checkFormatVersion(majorVersion); // throws IAE if invalid
int trailerSize = getTrailerSize(majorVersion);

View File

@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -521,12 +520,32 @@ public class HFile {
DataBlockEncoding getEncodingOnDisk();
}
/**
* Method returns the reader given the specified arguments.
* TODO This is a bad abstraction. See HBASE-6635.
*
* @param path hfile's path
* @param fsdis an open checksummed stream of path's file
* @param fsdisNoFsChecksum an open unchecksummed stream of path's file
* @param size max size of the trailer.
* @param closeIStream boolean for closing file after the getting the reader version.
* @param cacheConf Cache configuation values, cannot be null.
* @param preferredEncodingInCache
* @param hfs
* @return an appropriate instance of HFileReader
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
*/
private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
FSDataInputStream fsdisNoFsChecksum,
long size, boolean closeIStream, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache, HFileSystem hfs)
throws IOException {
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
FixedFileTrailer trailer = null;
try {
trailer = FixedFileTrailer.readFromStream(fsdis, size);
} catch (IllegalArgumentException iae) {
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
}
switch (trailer.getMajorVersion()) {
case 1:
return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
@ -536,11 +555,18 @@ public class HFile {
size, closeIStream,
cacheConf, preferredEncodingInCache, hfs);
default:
throw new IOException("Cannot instantiate reader for HFile version " +
trailer.getMajorVersion());
throw new CorruptHFileException("Invalid HFile version " + trailer.getMajorVersion());
}
}
/**
* @param fs A file system
* @param path Path to HFile
* @param cacheConf Cache configuration for hfile's contents
* @param preferredEncodingInCache Preferred in-cache data encoding algorithm.
* @return A version specific Hfile Reader
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
*/
public static Reader createReaderWithEncoding(
FileSystem fs, Path path, CacheConfig cacheConf,
DataBlockEncoding preferredEncodingInCache) throws IOException {
@ -570,7 +596,8 @@ public class HFile {
* @param fs filesystem
* @param path Path to file to read
* @param cacheConf This must not be null. @see {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
* @return an active Reader instance.
* @return an active Reader instance
* @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
*/
public static Reader createReader(
FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {

View File

@ -498,7 +498,7 @@ public class HLogSplitter {
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException {
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
"hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
"hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
if (!fs.mkdirs(corruptDir)) {
LOG.info("Unable to mkdir " + corruptDir);

View File

@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -48,6 +49,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@ -1032,6 +1034,114 @@ public abstract class FSUtils {
return new Path(rootdir, tableName);
}
/**
* Filter for all dirs that don't start with '.'
*/
public static class RegionDirFilter implements PathFilter {
// This pattern will accept 0.90+ style hex region dirs and older numeric region dir names.
final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$");
final FileSystem fs;
public RegionDirFilter(FileSystem fs) {
this.fs = fs;
}
@Override
public boolean accept(Path rd) {
if (!regionDirPattern.matcher(rd.getName()).matches()) {
return false;
}
try {
return fs.getFileStatus(rd).isDir();
} catch (IOException ioe) {
// Maybe the file was moved or the fs was disconnected.
LOG.warn("Skipping file " + rd +" due to IOException", ioe);
return false;
}
}
}
/**
* Given a particular table dir, return all the regiondirs inside it, excluding files such as
* .tableinfo
* @param fs A file system for the Path
* @param tableDir Path to a specific table directory <hbase.rootdir>/<tabledir>
* @return List of paths to valid region directories in table dir.
* @throws IOException
*/
public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException {
// assumes we are in a table dir.
FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
List<Path> regionDirs = new ArrayList<Path>(rds.length);
for (FileStatus rdfs: rds) {
Path rdPath = rdfs.getPath();
regionDirs.add(rdPath);
}
return regionDirs;
}
/**
* Filter for all dirs that are legal column family names. This is generally used for colfam
* dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
*/
public static class FamilyDirFilter implements PathFilter {
final FileSystem fs;
public FamilyDirFilter(FileSystem fs) {
this.fs = fs;
}
@Override
public boolean accept(Path rd) {
try {
// throws IAE if invalid
HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
} catch (IllegalArgumentException iae) {
// path name is an invalid family name and thus is excluded.
return false;
}
try {
return fs.getFileStatus(rd).isDir();
} catch (IOException ioe) {
// Maybe the file was moved or the fs was disconnected.
LOG.warn("Skipping file " + rd +" due to IOException", ioe);
return false;
}
}
}
/**
* Filter for HFiles that excludes reference files.
*/
public static class HFileFilter implements PathFilter {
// This pattern will accept 0.90+ style hex hfies files but reject reference files
final public static Pattern hfilePattern = Pattern.compile("^([0-9a-f]+)$");
final FileSystem fs;
public HFileFilter(FileSystem fs) {
this.fs = fs;
}
@Override
public boolean accept(Path rd) {
if (!hfilePattern.matcher(rd.getName()).matches()) {
return false;
}
try {
// only files
return !fs.getFileStatus(rd).isDir();
} catch (IOException ioe) {
// Maybe the file was moved or the fs was disconnected.
LOG.warn("Skipping file " + rd +" due to IOException", ioe);
return false;
}
}
}
/**
* @param conf
* @return Returns the filesystem of the hbase rootdir.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@ -36,6 +37,7 @@ import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
@ -83,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
@ -164,8 +167,10 @@ public class HBaseFsck {
private HConnection connection;
private HBaseAdmin admin;
private HTable meta;
private ScheduledThreadPoolExecutor executor; // threads to retrieve data from regionservers
protected ExecutorService executor; // threads to retrieve data from regionservers
private long startMillis = System.currentTimeMillis();
private HFileCorruptionChecker hfcc;
private int retcode = 0;
/***********
* Options
@ -242,6 +247,22 @@ public class HBaseFsck {
executor = new ScheduledThreadPoolExecutor(numThreads);
}
/**
* Constructor
*
* @param conf
* Configuration object
* @throws MasterNotRunningException
* if the master is not running
* @throws ZooKeeperConnectionException
* if unable to connect to ZooKeeper
*/
public HBaseFsck(Configuration conf, ExecutorService exec) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException {
this.conf = conf;
this.executor = exec;
}
/**
* To repair region consistency, one must call connect() in order to repair
* online state.
@ -3084,6 +3105,10 @@ public class HBaseFsck {
tablesIncluded.add(table);
}
Set<String> getIncludedTables() {
return new HashSet<String>(tablesIncluded);
}
/**
* We are interested in only those tables that have not changed their state in
* META during the last few seconds specified by hbase.admin.fsck.timelag
@ -3101,7 +3126,27 @@ public class HBaseFsck {
this.sidelineDir = new Path(sidelineDir);
}
protected static void printUsageAndExit() {
protected HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles);
}
public HFileCorruptionChecker getHFilecorruptionChecker() {
return hfcc;
}
public void setHFileCorruptionChecker(HFileCorruptionChecker hfcc) {
this.hfcc = hfcc;
}
public void setRetCode(int code) {
this.retcode = code;
}
public int getRetCode() {
return retcode;
}
protected HBaseFsck printUsageAndExit() {
System.err.println("Usage: fsck [opts] {only tables}");
System.err.println(" where [opts] are:");
System.err.println(" -help Display help options (this)");
@ -3115,7 +3160,8 @@ public class HBaseFsck {
System.err.println(" -metaonly Only check the state of ROOT and META tables.");
System.err.println(" -sidelineDir <hdfs://> HDFS path to backup existing meta and root.");
System.err.println(" Repair options: (expert features, use with caution!)");
System.err.println("");
System.err.println(" Metadata Repair options: (expert features, use with caution!)");
System.err.println(" -fix Try to fix region assignments. This is for backwards compatiblity");
System.err.println(" -fixAssignments Try to fix region assignments. Replaces the old -fix");
System.err.println(" -fixMeta Try to fix meta problems. This assumes HDFS region info is good.");
@ -3128,16 +3174,25 @@ public class HBaseFsck {
System.err.println(" -maxOverlapsToSideline <n> When fixing region overlaps, allow at most <n> regions to sideline per group. (n=" + DEFAULT_OVERLAPS_TO_SIDELINE +" by default)");
System.err.println(" -fixSplitParents Try to force offline split parents to be online.");
System.err.println(" -ignorePreCheckPermission ignore filesystem permission pre-check");
System.err.println("");
System.err.println(" Datafile Repair options: (expert features, use with caution!)");
System.err.println(" -checkCorruptHFiles Check all Hfiles by opening them to make sure they are valid");
System.err.println(" -sidelineCorruptHfiles Quarantine corrupted HFiles. implies -checkCorruptHfiles");
System.err.println("");
System.err.println(" Metadata Repair shortcuts");
System.err.println(" -repair Shortcut for -fixAssignments -fixMeta -fixHdfsHoles " +
"-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps");
System.err.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles");
Runtime.getRuntime().exit(-2);
setRetCode(-2);
return this;
}
/**
* Main program
*
* @param args
* @throws Exception
*/
@ -3149,162 +3204,204 @@ public class HBaseFsck {
URI defaultFs = hbasedir.getFileSystem(conf).getUri();
conf.set("fs.defaultFS", defaultFs.toString()); // for hadoop 0.21+
conf.set("fs.default.name", defaultFs.toString()); // for hadoop 0.20
HBaseFsck fsck = new HBaseFsck(conf);
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
ExecutorService exec = new ScheduledThreadPoolExecutor(numThreads);
HBaseFsck hbck = new HBaseFsck(conf, exec);
hbck.exec(exec, args);
int retcode = hbck.getRetCode();
Runtime.getRuntime().exit(retcode);
}
public HBaseFsck exec(ExecutorService exec, String[] args) throws KeeperException, IOException,
ServiceException, InterruptedException {
long sleepBeforeRerun = DEFAULT_SLEEP_BEFORE_RERUN;
boolean checkCorruptHFiles = false;
boolean sidelineCorruptHFiles = false;
// Process command-line args.
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
if (cmd.equals("-help") || cmd.equals("-h")) {
printUsageAndExit();
return printUsageAndExit();
} else if (cmd.equals("-details")) {
fsck.setDisplayFullReport();
setDisplayFullReport();
} else if (cmd.equals("-timelag")) {
if (i == args.length - 1) {
System.err.println("HBaseFsck: -timelag needs a value.");
printUsageAndExit();
return printUsageAndExit();
}
try {
long timelag = Long.parseLong(args[i+1]);
fsck.setTimeLag(timelag);
setTimeLag(timelag);
} catch (NumberFormatException e) {
System.err.println("-timelag needs a numeric value.");
printUsageAndExit();
return printUsageAndExit();
}
i++;
} else if (cmd.equals("-sleepBeforeRerun")) {
if (i == args.length - 1) {
System.err.println("HBaseFsck: -sleepBeforeRerun needs a value.");
printUsageAndExit();
return printUsageAndExit();
}
try {
sleepBeforeRerun = Long.parseLong(args[i+1]);
} catch (NumberFormatException e) {
System.err.println("-sleepBeforeRerun needs a numeric value.");
printUsageAndExit();
return printUsageAndExit();
}
i++;
} else if (cmd.equals("-sidelineDir")) {
if (i == args.length - 1) {
System.err.println("HBaseFsck: -sidelineDir needs a value.");
printUsageAndExit();
return printUsageAndExit();
}
i++;
fsck.setSidelineDir(args[i]);
setSidelineDir(args[i]);
} else if (cmd.equals("-fix")) {
System.err.println("This option is deprecated, please use " +
"-fixAssignments instead.");
fsck.setFixAssignments(true);
setFixAssignments(true);
} else if (cmd.equals("-fixAssignments")) {
fsck.setFixAssignments(true);
setFixAssignments(true);
} else if (cmd.equals("-fixMeta")) {
fsck.setFixMeta(true);
setFixMeta(true);
} else if (cmd.equals("-fixHdfsHoles")) {
fsck.setFixHdfsHoles(true);
setFixHdfsHoles(true);
} else if (cmd.equals("-fixHdfsOrphans")) {
fsck.setFixHdfsOrphans(true);
setFixHdfsOrphans(true);
} else if (cmd.equals("-fixHdfsOverlaps")) {
fsck.setFixHdfsOverlaps(true);
setFixHdfsOverlaps(true);
} else if (cmd.equals("-fixVersionFile")) {
fsck.setFixVersionFile(true);
setFixVersionFile(true);
} else if (cmd.equals("-sidelineBigOverlaps")) {
fsck.setSidelineBigOverlaps(true);
setSidelineBigOverlaps(true);
} else if (cmd.equals("-fixSplitParents")) {
fsck.setFixSplitParents(true);
setFixSplitParents(true);
} else if (cmd.equals("-ignorePreCheckPermission")) {
fsck.setIgnorePreCheckPermission(true);
setIgnorePreCheckPermission(true);
} else if (cmd.equals("-checkCorruptHFiles")) {
checkCorruptHFiles = true;
} else if (cmd.equals("-sidelineCorruptHFiles")) {
sidelineCorruptHFiles = true;
} else if (cmd.equals("-repair")) {
// this attempts to merge overlapping hdfs regions, needs testing
// under load
fsck.setFixHdfsHoles(true);
fsck.setFixHdfsOrphans(true);
fsck.setFixMeta(true);
fsck.setFixAssignments(true);
fsck.setFixHdfsOverlaps(true);
fsck.setFixVersionFile(true);
fsck.setSidelineBigOverlaps(true);
fsck.setFixSplitParents(false);
setFixHdfsHoles(true);
setFixHdfsOrphans(true);
setFixMeta(true);
setFixAssignments(true);
setFixHdfsOverlaps(true);
setFixVersionFile(true);
setSidelineBigOverlaps(true);
setFixSplitParents(false);
} else if (cmd.equals("-repairHoles")) {
// this will make all missing hdfs regions available but may lose data
fsck.setFixHdfsHoles(true);
fsck.setFixHdfsOrphans(false);
fsck.setFixMeta(true);
fsck.setFixAssignments(true);
fsck.setFixHdfsOverlaps(false);
fsck.setSidelineBigOverlaps(false);
fsck.setFixSplitParents(false);
setFixHdfsHoles(true);
setFixHdfsOrphans(false);
setFixMeta(true);
setFixAssignments(true);
setFixHdfsOverlaps(false);
setSidelineBigOverlaps(false);
setFixSplitParents(false);
} else if (cmd.equals("-maxOverlapsToSideline")) {
if (i == args.length - 1) {
System.err.println("-maxOverlapsToSideline needs a numeric value argument.");
printUsageAndExit();
return printUsageAndExit();
}
try {
int maxOverlapsToSideline = Integer.parseInt(args[i+1]);
fsck.setMaxOverlapsToSideline(maxOverlapsToSideline);
setMaxOverlapsToSideline(maxOverlapsToSideline);
} catch (NumberFormatException e) {
System.err.println("-maxOverlapsToSideline needs a numeric value argument.");
printUsageAndExit();
return printUsageAndExit();
}
i++;
} else if (cmd.equals("-maxMerge")) {
if (i == args.length - 1) {
System.err.println("-maxMerge needs a numeric value argument.");
printUsageAndExit();
return printUsageAndExit();
}
try {
int maxMerge = Integer.parseInt(args[i+1]);
fsck.setMaxMerge(maxMerge);
setMaxMerge(maxMerge);
} catch (NumberFormatException e) {
System.err.println("-maxMerge needs a numeric value argument.");
printUsageAndExit();
return printUsageAndExit();
}
i++;
} else if (cmd.equals("-summary")) {
fsck.setSummary();
setSummary();
} else if (cmd.equals("-metaonly")) {
fsck.setCheckMetaOnly();
setCheckMetaOnly();
} else if (cmd.startsWith("-")) {
System.err.println("Unrecognized option:" + cmd);
printUsageAndExit();
return printUsageAndExit();
} else {
fsck.includeTable(cmd);
includeTable(cmd);
System.out.println("Allow checking/fixes for table: " + cmd);
}
}
// pre-check current user has FS write permission or not
try {
fsck.preCheckPermission();
preCheckPermission();
} catch (AccessControlException ace) {
Runtime.getRuntime().exit(-1);
} catch (IOException ioe) {
Runtime.getRuntime().exit(-1);
}
// do the real work of fsck
fsck.connect();
int code = fsck.onlineHbck();
// If we have changed the HBase state it is better to run fsck again
// do the real work of hbck
connect();
// if corrupt file mode is on, first fix them since they may be opened later
if (checkCorruptHFiles || sidelineCorruptHFiles) {
LOG.info("Checking all hfiles for corruption");
HFileCorruptionChecker hfcc = createHFileCorruptionChecker(sidelineCorruptHFiles);
setHFileCorruptionChecker(hfcc); // so we can get result
Collection<String> tables = getIncludedTables();
Collection<Path> tableDirs = new ArrayList<Path>();
Path rootdir = FSUtils.getRootDir(conf);
if (tables.size() > 0) {
for (String t : tables) {
tableDirs.add(FSUtils.getTablePath(rootdir, t));
}
} else {
tableDirs = FSUtils.getTableDirs(FSUtils.getCurrentFileSystem(conf), rootdir);
}
hfcc.checkTables(tableDirs);
PrintWriter out = new PrintWriter(System.out);
hfcc.report(out);
out.flush();
}
// check and fix table integrity, region consistency.
int code = onlineHbck();
setRetCode(code);
// If we have changed the HBase state it is better to run hbck again
// to see if we haven't broken something else in the process.
// We run it only once more because otherwise we can easily fall into
// an infinite loop.
if (fsck.shouldRerun()) {
if (shouldRerun()) {
try {
LOG.info("Sleeping " + sleepBeforeRerun + "ms before re-checking after fix...");
Thread.sleep(sleepBeforeRerun);
} catch (InterruptedException ie) {
Runtime.getRuntime().exit(code);
return this;
}
// Just report
fsck.setFixAssignments(false);
fsck.setFixMeta(false);
fsck.setFixHdfsHoles(false);
fsck.setFixHdfsOverlaps(false);
fsck.setFixVersionFile(false);
fsck.errors.resetErrors();
code = fsck.onlineHbck();
setFixAssignments(false);
setFixMeta(false);
setFixHdfsHoles(false);
setFixHdfsOverlaps(false);
setFixVersionFile(false);
errors.resetErrors();
code = onlineHbck();
setRetCode(code);
}
Runtime.getRuntime().exit(code);
return this;
}
/**

View File

@ -0,0 +1,352 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.hbck;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
import org.apache.hadoop.hbase.util.FSUtils.RegionDirFilter;
import org.apache.hadoop.hbase.util.HBaseFsck;
/**
* This class marches through all of the region's hfiles and verifies that
* they are all valid files. One just needs to instantiate the class, use
* checkTables(List<Path>) and then retrieve the corrupted hfiles (and
* quarantined files if in quarantining mode)
*
* The implementation currently parallelizes at the regionDir level.
*/
@InterfaceAudience.Private
public class HFileCorruptionChecker {
private static final Log LOG = LogFactory.getLog(HFileCorruptionChecker.class);
final Configuration conf;
final FileSystem fs;
final CacheConfig cacheConf;
final ExecutorService executor;
final Set<Path> corrupted = new ConcurrentSkipListSet<Path>();
final Set<Path> failures = new ConcurrentSkipListSet<Path>();
final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
final Set<Path> missing = new ConcurrentSkipListSet<Path>();
final boolean inQuarantineMode;
final AtomicInteger hfilesChecked = new AtomicInteger();
public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
boolean quarantine) throws IOException {
this.conf = conf;
this.fs = FileSystem.get(conf);
this.cacheConf = new CacheConfig(conf);
this.executor = executor;
this.inQuarantineMode = quarantine;
}
/**
* Checks a path to see if it is a valid hfile.
*
* @param p
* full Path to an HFile
* @throws IOException
* This is a connectivity related exception
*/
protected void checkHFile(Path p) throws IOException {
HFile.Reader r = null;
try {
r = HFile.createReader(fs, p, cacheConf);
} catch (CorruptHFileException che) {
LOG.warn("Found corrupt HFile " + p, che);
corrupted.add(p);
if (inQuarantineMode) {
Path dest = createQuarantinePath(p);
LOG.warn("Quarantining corrupt HFile " + p + " into " + dest);
boolean success = fs.mkdirs(dest.getParent());
success = success ? fs.rename(p, dest): false;
if (!success) {
failures.add(p);
} else {
quarantined.add(dest);
}
}
return;
} catch (FileNotFoundException fnfe) {
LOG.warn("HFile " + p + " was missing. Likely removed due to compaction/split?");
missing.add(p);
} finally {
hfilesChecked.addAndGet(1);
if (r != null) {
r.close(true);
}
}
}
/**
* Given a path, generates a new path to where we move a corrupted hfile (bad
* trailer, no trailer).
*
* @param hFile
* Path to a corrupt hfile (assumes that it is HBASE_DIR/ table
* /region/cf/file)
* @return path to where corrupted files are stored. This should be
* HBASE_DIR/.corrupt/table/region/cf/file.
*/
Path createQuarantinePath(Path hFile) {
// extract the normal dirs structure
Path cfDir = hFile.getParent();
Path regionDir = cfDir.getParent();
Path tableDir = regionDir.getParent();
// build up the corrupted dirs strcture
Path corruptBaseDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
"hbase.hfile.quarantine.dir", HConstants.CORRUPT_DIR_NAME));
Path corruptTableDir = new Path(corruptBaseDir, tableDir.getName());
Path corruptRegionDir = new Path(corruptTableDir, regionDir.getName());
Path corruptFamilyDir = new Path(corruptRegionDir, cfDir.getName());
Path corruptHfile = new Path(corruptFamilyDir, hFile.getName());
return corruptHfile;
}
/**
* Check all files in a column family dir.
*
* @param cfDir
* column family directory
* @throws IOException
*/
protected void checkColFamDir(Path cfDir) throws IOException {
FileStatus[] hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner.
if (hfs.length == 0 && !fs.exists(cfDir)) {
// interestingly, listStatus does not throw an exception if the path does not exist.
LOG.warn("Colfam Directory " + cfDir +
" does not exist. Likely due to concurrent split/compaction. Skipping.");
missing.add(cfDir);
return;
}
for (FileStatus hfFs : hfs) {
Path hf = hfFs.getPath();
checkHFile(hf);
}
}
/**
* Check all column families in a region dir.
*
* @param regionDir
* region directory
* @throws IOException
*/
protected void checkRegionDir(Path regionDir) throws IOException {
FileStatus[] cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
if (cfs.length == 0 && !fs.exists(regionDir)) {
// interestingly, listStatus does not throw an exception if the path does not exist.
LOG.warn("Region Directory " + regionDir +
" does not exist. Likely due to concurrent split/compaction. Skipping.");
missing.add(regionDir);
return;
}
for (FileStatus cfFs : cfs) {
Path cfDir = cfFs.getPath();
checkColFamDir(cfDir);
}
}
/**
* Check all the regiondirs in the specified tableDir
*
* @param tableDir
* path to a table
* @throws IOException
*/
void checkTableDir(Path tableDir) throws IOException {
FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
if (rds.length == 0 && !fs.exists(tableDir)) {
// interestingly listStatus does not throw an exception if the path does not exist.
LOG.warn("Table Directory " + tableDir +
" does not exist. Likely due to concurrent delete. Skipping.");
missing.add(tableDir);
return;
}
// Parallelize check at the region dir level
List<RegionDirChecker> rdcs = new ArrayList<RegionDirChecker>();
List<Future<Void>> rdFutures;
for (FileStatus rdFs : rds) {
Path rdDir = rdFs.getPath();
RegionDirChecker work = new RegionDirChecker(rdDir);
rdcs.add(work);
}
// Submit and wait for completion
try {
rdFutures = executor.invokeAll(rdcs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Region dirs checking interrupted!", ie);
return;
}
for (int i = 0; i < rdFutures.size(); i++) {
Future<Void> f = rdFutures.get(i);
try {
f.get();
} catch (ExecutionException e) {
LOG.warn("Failed to quaratine an HFile in regiondir "
+ rdcs.get(i).regionDir, e.getCause());
// rethrow IOExceptions
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
// rethrow RuntimeExceptions
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
// this should never happen
LOG.error("Unexpected exception encountered", e);
return; // bailing out.
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Region dirs check interrupted!", ie);
// bailing out
return;
}
}
}
/**
* An individual work item for parallelized regiondir processing. This is
* intentionally an inner class so it can use the shared error sets and fs.
*/
private class RegionDirChecker implements Callable<Void> {
final Path regionDir;
RegionDirChecker(Path regionDir) {
this.regionDir = regionDir;
}
@Override
public Void call() throws IOException {
checkRegionDir(regionDir);
return null;
}
}
/**
* Check the specified table dirs for bad hfiles.
*/
public void checkTables(Collection<Path> tables) throws IOException {
for (Path t : tables) {
checkTableDir(t);
}
}
/**
* @return the set of check failure file paths after checkTables is called.
*/
public Collection<Path> getFailures() {
return new HashSet<Path>(failures);
}
/**
* @return the set of corrupted file paths after checkTables is called.
*/
public Collection<Path> getCorrupted() {
return new HashSet<Path>(corrupted);
}
/**
* @return number of hfiles checked in the last HfileCorruptionChecker run
*/
public int getHFilesChecked() {
return hfilesChecked.get();
}
/**
* @return the set of successfully quarantined paths after checkTables is called.
*/
public Collection<Path> getQuarantined() {
return new HashSet<Path>(quarantined);
}
/**
* @return the set of paths that were missing. Likely due to deletion/moves from
* compaction or flushes.
*/
public Collection<Path> getMissing() {
return new HashSet<Path>(missing);
}
/**
* Print a human readable summary of hfile quarantining operations.
* @param out
*/
public void report(PrintWriter out) {
out.println("Checked " + hfilesChecked.get() + " hfile for corruption");
out.println(" HFiles corrupted: " + corrupted.size());
if (inQuarantineMode) {
out.println(" HFiles successfully quarantined: " + quarantined.size());
for (Path sq : quarantined) {
out.println(" " + sq);
}
out.println(" HFiles failed quarantine: " + failures.size());
for (Path fq : failures) {
out.println(" " + fq);
}
}
out.println(" HFiles moved while checking: " + missing.size());
for (Path mq : missing) {
out.println(" " + mq);
}
String initialState = (corrupted.size() == 0) ? "OK" : "CORRUPTED";
String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
: "CORRUPTED";
if (inQuarantineMode) {
out.println("Summary: " + initialState + " => " + fixedState);
} else {
out.println("Summary: " + initialState);
}
}
}

View File

@ -140,7 +140,7 @@ public class TestFixedFileTrailer {
try {
readTrailer(trailerPath);
fail("Exception expected");
} catch (IOException ex) {
} catch (IllegalArgumentException ex) {
// Make it easy to debug this.
String msg = ex.getMessage();
String cleanMsg = msg.replaceAll(

View File

@ -30,12 +30,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.junit.experimental.categories.Category;
@ -62,14 +65,12 @@ public class TestHFile extends HBaseTestCase {
@Override
public void setUp() throws Exception {
startingMetrics = SchemaMetrics.getMetricsSnapshot();
super.setUp();
}
@Override
public void tearDown() throws Exception {
super.tearDown();
SchemaMetrics.validateMetricChanges(startingMetrics);
}
@ -90,6 +91,61 @@ public class TestHFile extends HBaseTestCase {
assertNull(r.getLastKey());
}
/**
* Create 0-length hfile and show that it fails
*/
public void testCorrupt0LengthHFile() throws IOException {
if (cacheConf == null) cacheConf = new CacheConfig(conf);
Path f = new Path(ROOT_DIR, getName());
FSDataOutputStream fsos = fs.create(f);
fsos.close();
try {
Reader r = HFile.createReader(fs, f, cacheConf);
} catch (CorruptHFileException che) {
// Expected failure
return;
}
fail("Should have thrown exception");
}
public static void truncateFile(FileSystem fs, Path src, Path dst) throws IOException {
FileStatus fst = fs.getFileStatus(src);
long len = fst.getLen();
len = len / 2 ;
// create a truncated hfile
FSDataOutputStream fdos = fs.create(dst);
byte[] buf = new byte[(int)len];
FSDataInputStream fdis = fs.open(src);
fdis.read(buf);
fdos.write(buf);
fdis.close();
fdos.close();
}
/**
* Create a truncated hfile and verify that exception thrown.
*/
public void testCorruptTruncatedHFile() throws IOException {
if (cacheConf == null) cacheConf = new CacheConfig(conf);
Path f = new Path(ROOT_DIR, getName());
Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(this.fs, f).create();
writeSomeRecords(w, 0, 100);
w.close();
Path trunc = new Path(f.getParent(), "trucated");
truncateFile(fs, w.getPath(), trunc);
try {
Reader r = HFile.createReader(fs, trunc, cacheConf);
} catch (CorruptHFileException che) {
// Expected failure
return;
}
fail("Should have thrown exception");
}
// write some records into the tfile
// write them twice
private int writeSomeRecords(Writer writer, int start, int n)

View File

@ -35,10 +35,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus;
@ -48,7 +51,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.ServerName;
@ -63,6 +66,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.io.hfile.TestHFile;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionStates;
@ -71,6 +75,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.HBaseFsck.HbckInfo;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -78,18 +84,20 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import com.google.common.collect.Multimap;
/**
* This tests HBaseFsck's ability to detect reasons for inconsistent tables.
*/
@Category(MediumTests.class)
@Category(LargeTests.class)
public class TestHBaseFsck {
final static Log LOG = LogFactory.getLog(TestHBaseFsck.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static Configuration conf = TEST_UTIL.getConfiguration();
private final static byte[] FAM = Bytes.toBytes("fam");
private final static String FAM_STR = "fam";
private final static byte[] FAM = Bytes.toBytes(FAM_STR);
private final static int REGION_ONLINE_TIMEOUT = 800;
private static RegionStates regionStates;
@ -1275,8 +1283,187 @@ public class TestHBaseFsck {
deleteTable(table);
}
}
/**
* We don't have an easy way to verify that a flush completed, so we loop until we find a
* legitimate hfile and return it.
* @param fs
* @param table
* @return Path of a flushed hfile.
* @throws IOException
*/
Path getFlushedHFile(FileSystem fs, String table) throws IOException {
Path tableDir= FSUtils.getTablePath(FSUtils.getRootDir(conf), table);
Path regionDir = FSUtils.getRegionDirs(fs, tableDir).get(0);
Path famDir = new Path(regionDir, FAM_STR);
// keep doing this until we get a legit hfile
while (true) {
FileStatus[] hfFss = fs.listStatus(famDir);
if (hfFss.length == 0) {
continue;
}
for (FileStatus hfs : hfFss) {
if (!hfs.isDir()) {
return hfs.getPath();
}
}
}
}
/**
* This creates a table and then corrupts an hfile. Hbck should quarantine the file.
*/
@Test(timeout=120000)
public void testQuarantineCorruptHFile() throws Exception {
String table = name.getMethodName();
try {
setupTable(table);
assertEquals(ROWKEYS.length, countRows());
TEST_UTIL.getHBaseAdmin().flush(table); // flush is async.
FileSystem fs = FileSystem.get(conf);
Path hfile = getFlushedHFile(fs, table);
// Mess it up by leaving a hole in the assignment, meta, and hdfs data
TEST_UTIL.getHBaseAdmin().disableTable(table);
// create new corrupt file called deadbeef (valid hfile name)
Path corrupt = new Path(hfile.getParent(), "deadbeef");
TestHFile.truncateFile(fs, hfile, corrupt);
LOG.info("Created corrupted file " + corrupt);
HBaseFsck.debugLsr(conf, FSUtils.getRootDir(conf));
// we cannot enable here because enable never finished due to the corrupt region.
HBaseFsck res = HbckTestingUtil.doHFileQuarantine(conf, table);
assertEquals(res.getRetCode(), 0);
HFileCorruptionChecker hfcc = res.getHFilecorruptionChecker();
assertEquals(hfcc.getHFilesChecked(), 5);
assertEquals(hfcc.getCorrupted().size(), 1);
assertEquals(hfcc.getFailures().size(), 0);
assertEquals(hfcc.getQuarantined().size(), 1);
assertEquals(hfcc.getMissing().size(), 0);
// Its been fixed, verify that we can enable.
TEST_UTIL.getHBaseAdmin().enableTable(table);
} finally {
deleteTable(table);
}
}
private void doQuarantineTest(String table, HBaseFsck hbck, int check, int corrupt, int fail,
int quar, int missing) throws Exception {
try {
setupTable(table);
assertEquals(ROWKEYS.length, countRows());
TEST_UTIL.getHBaseAdmin().flush(table); // flush is async.
// Mess it up by leaving a hole in the assignment, meta, and hdfs data
TEST_UTIL.getHBaseAdmin().disableTable(table);
String[] args = {"-sidelineCorruptHFiles", "-repairHoles", "-ignorePreCheckPermission", table};
ExecutorService exec = new ScheduledThreadPoolExecutor(10);
HBaseFsck res = hbck.exec(exec, args);
HFileCorruptionChecker hfcc = res.getHFilecorruptionChecker();
assertEquals(hfcc.getHFilesChecked(), check);
assertEquals(hfcc.getCorrupted().size(), corrupt);
assertEquals(hfcc.getFailures().size(), fail);
assertEquals(hfcc.getQuarantined().size(), quar);
assertEquals(hfcc.getMissing().size(), missing);
// its been fixed, verify that we can enable
TEST_UTIL.getHBaseAdmin().enableTable(table);
} finally {
deleteTable(table);
}
}
/**
* This creates a table and simulates the race situation where a concurrent compaction or split
* has removed an hfile after the corruption checker learned about it.
*/
@Test(timeout=120000)
public void testQuarantineMissingHFile() throws Exception {
String table = name.getMethodName();
ExecutorService exec = new ScheduledThreadPoolExecutor(10);
// inject a fault in the hfcc created.
final FileSystem fs = FileSystem.get(conf);
HBaseFsck hbck = new HBaseFsck(conf, exec) {
public HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles) {
boolean attemptedFirstHFile = false;
protected void checkHFile(Path p) throws IOException {
if (!attemptedFirstHFile) {
attemptedFirstHFile = true;
assertTrue(fs.delete(p, true)); // make sure delete happened.
}
super.checkHFile(p);
}
};
}
};
doQuarantineTest(table, hbck, 4, 0, 0, 0, 1); // 4 attempted, but 1 missing.
}
/**
* This creates a table and simulates the race situation where a concurrent compaction or split
* has removed an colfam dir before the corruption checker got to it.
*/
@Test(timeout=120000)
public void testQuarantineMissingFamdir() throws Exception {
String table = name.getMethodName();
ExecutorService exec = new ScheduledThreadPoolExecutor(10);
// inject a fault in the hfcc created.
final FileSystem fs = FileSystem.get(conf);
HBaseFsck hbck = new HBaseFsck(conf, exec) {
public HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles) {
boolean attemptedFirstFamDir = false;
protected void checkColFamDir(Path p) throws IOException {
if (!attemptedFirstFamDir) {
attemptedFirstFamDir = true;
assertTrue(fs.delete(p, true)); // make sure delete happened.
}
super.checkColFamDir(p);
}
};
}
};
doQuarantineTest(table, hbck, 3, 0, 0, 0, 1);
}
/**
* This creates a table and simulates the race situation where a concurrent compaction or split
* has removed a region dir before the corruption checker got to it.
*/
@Test(timeout=120000)
public void testQuarantineMissingRegionDir() throws Exception {
String table = name.getMethodName();
ExecutorService exec = new ScheduledThreadPoolExecutor(10);
// inject a fault in the hfcc created.
final FileSystem fs = FileSystem.get(conf);
HBaseFsck hbck = new HBaseFsck(conf, exec) {
public HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles) {
boolean attemptedFirstRegionDir = false;
protected void checkRegionDir(Path p) throws IOException {
if (!attemptedFirstRegionDir) {
attemptedFirstRegionDir = true;
assertTrue(fs.delete(p, true)); // make sure delete happened.
}
super.checkRegionDir(p);
}
};
}
};
doQuarantineTest(table, hbck, 3, 0, 0, 0, 1);
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
@org.junit.Rule
public TestName name = new TestName();
}

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.HBaseFsck;
@ -59,6 +61,21 @@ public class HbckTestingUtil {
return fsck;
}
/**
* Runs hbck with the -sidelineCorruptHFiles option
* @param conf
* @param table table constraint
* @return <returncode, hbckInstance>
* @throws Exception
*/
public static HBaseFsck doHFileQuarantine(Configuration conf, String table) throws Exception {
String[] args = {"-sidelineCorruptHFiles", "-ignorePreCheckPermission", table};
ExecutorService exec = new ScheduledThreadPoolExecutor(10);
HBaseFsck hbck = new HBaseFsck(conf, exec);
hbck.exec(exec, args);
return hbck;
}
public static void assertNoErrors(HBaseFsck fsck) throws Exception {
List<ERROR_CODE> errs = fsck.getErrors().getErrorList();
assertEquals(new ArrayList<ERROR_CODE>(), errs);