HBASE-13806 Check the mob files when there are mob-enabled columns in HFileCorruptionChecker. (Jingcheng)

This commit is contained in:
anoopsjohn 2015-06-08 11:17:43 +05:30
parent efbef296d6
commit 13fe542bcc
2 changed files with 307 additions and 0 deletions

View File

@ -39,9 +39,11 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSUtils.FamilyDirFilter;
import org.apache.hadoop.hbase.util.FSUtils.HFileFilter;
@ -68,8 +70,13 @@ public class HFileCorruptionChecker {
final Set<Path> failures = new ConcurrentSkipListSet<Path>();
final Set<Path> quarantined = new ConcurrentSkipListSet<Path>();
final Set<Path> missing = new ConcurrentSkipListSet<Path>();
final Set<Path> corruptedMobFiles = new ConcurrentSkipListSet<Path>();
final Set<Path> failureMobFiles = new ConcurrentSkipListSet<Path>();
final Set<Path> missedMobFiles = new ConcurrentSkipListSet<Path>();
final Set<Path> quarantinedMobFiles = new ConcurrentSkipListSet<Path>();
final boolean inQuarantineMode;
final AtomicInteger hfilesChecked = new AtomicInteger();
final AtomicInteger mobFilesChecked = new AtomicInteger();
public HFileCorruptionChecker(Configuration conf, ExecutorService executor,
boolean quarantine) throws IOException {
@ -176,6 +183,109 @@ public class HFileCorruptionChecker {
}
}
/**
* Check all files in a mob column family dir.
*
* @param cfDir
* mob column family directory
* @throws IOException
*/
protected void checkMobColFamDir(Path cfDir) throws IOException {
FileStatus[] hfs = null;
try {
hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner.
} catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Mob colfam Directory " + cfDir +
" does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(cfDir);
return;
}
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (hfs.length == 0 && !fs.exists(cfDir)) {
LOG.warn("Mob colfam Directory " + cfDir +
" does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(cfDir);
return;
}
for (FileStatus hfFs : hfs) {
Path hf = hfFs.getPath();
checkMobFile(hf);
}
}
/**
* Checks a path to see if it is a valid mob file.
*
* @param p
* full Path to a mob file.
* @throws IOException
* This is a connectivity related exception
*/
protected void checkMobFile(Path p) throws IOException {
HFile.Reader r = null;
try {
r = HFile.createReader(fs, p, cacheConf, conf);
} catch (CorruptHFileException che) {
LOG.warn("Found corrupt mob file " + p, che);
corruptedMobFiles.add(p);
if (inQuarantineMode) {
Path dest = createQuarantinePath(p);
LOG.warn("Quarantining corrupt mob file " + p + " into " + dest);
boolean success = fs.mkdirs(dest.getParent());
success = success ? fs.rename(p, dest): false;
if (!success) {
failureMobFiles.add(p);
} else {
quarantinedMobFiles.add(dest);
}
}
return;
} catch (FileNotFoundException fnfe) {
LOG.warn("Mob file " + p + " was missing. Likely removed due to compaction?");
missedMobFiles.add(p);
} finally {
mobFilesChecked.addAndGet(1);
if (r != null) {
r.close(true);
}
}
}
/**
* Checks all the mob files of a table.
* @param regionDir The mob region directory
* @throws IOException
*/
private void checkMobRegionDir(Path regionDir) throws IOException {
if (!fs.exists(regionDir)) {
return;
}
FileStatus[] hfs = null;
try {
hfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
} catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Mob directory " + regionDir
+ " does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(regionDir);
return;
}
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (hfs.length == 0 && !fs.exists(regionDir)) {
LOG.warn("Mob directory " + regionDir
+ " does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(regionDir);
return;
}
for (FileStatus hfFs : hfs) {
Path hf = hfFs.getPath();
checkMobColFamDir(hf);
}
}
/**
* Check all column families in a region dir.
*
@ -236,6 +346,8 @@ public class HFileCorruptionChecker {
rdcs.add(work);
}
// add mob region
rdcs.add(createMobRegionDirChecker(tableDir));
// Submit and wait for completion
try {
rdFutures = executor.invokeAll(rdcs);
@ -292,6 +404,34 @@ public class HFileCorruptionChecker {
}
}
/**
* An individual work item for parallelized mob dir processing. This is
* intentionally an inner class so it can use the shared error sets and fs.
*/
private class MobRegionDirChecker extends RegionDirChecker {
MobRegionDirChecker(Path regionDir) {
super(regionDir);
}
@Override
public Void call() throws IOException {
checkMobRegionDir(regionDir);
return null;
}
}
/**
* Creates an instance of MobRegionDirChecker.
* @param tableDir The current table directory.
* @return An instance of MobRegionDirChecker.
*/
private MobRegionDirChecker createMobRegionDirChecker(Path tableDir) {
TableName tableName = FSUtils.getTableName(tableDir);
Path mobDir = MobUtils.getMobRegionPath(conf, tableName);
return new MobRegionDirChecker(mobDir);
}
/**
* Check the specified table dirs for bad hfiles.
*/
@ -337,6 +477,42 @@ public class HFileCorruptionChecker {
return new HashSet<Path>(missing);
}
/**
* @return the set of check failure mob file paths after checkTables is called.
*/
public Collection<Path> getFailureMobFiles() {
return new HashSet<Path>(failureMobFiles);
}
/**
* @return the set of corrupted mob file paths after checkTables is called.
*/
public Collection<Path> getCorruptedMobFiles() {
return new HashSet<Path>(corruptedMobFiles);
}
/**
* @return number of mob files checked in the last HfileCorruptionChecker run
*/
public int getMobFilesChecked() {
return mobFilesChecked.get();
}
/**
* @return the set of successfully quarantined paths after checkTables is called.
*/
public Collection<Path> getQuarantinedMobFiles() {
return new HashSet<Path>(quarantinedMobFiles);
}
/**
* @return the set of paths that were missing. Likely due to table deletion or
* deletion/moves from compaction.
*/
public Collection<Path> getMissedMobFiles() {
return new HashSet<Path>(missedMobFiles);
}
/**
* Print a human readable summary of hfile quarantining operations.
* @param out
@ -363,10 +539,31 @@ public class HFileCorruptionChecker {
String fixedState = (corrupted.size() == quarantined.size()) ? "OK"
: "CORRUPTED";
// print mob-related report
if (inQuarantineMode) {
out.print(" Mob files successfully quarantined: " + quarantinedMobFiles.size());
for (Path sq : quarantinedMobFiles) {
out.print(" " + sq);
}
out.print(" Mob files failed quarantine: " + failureMobFiles.size());
for (Path fq : failureMobFiles) {
out.print(" " + fq);
}
}
out.print(" Mob files moved while checking: " + missedMobFiles.size());
for (Path mq : missedMobFiles) {
out.print(" " + mq);
}
String initialMobState = (corruptedMobFiles.size() == 0) ? "OK" : "CORRUPTED";
String fixedMobState = (corruptedMobFiles.size() == quarantinedMobFiles.size()) ? "OK"
: "CORRUPTED";
if (inQuarantineMode) {
out.print("Summary: " + initialState + " => " + fixedState);
out.print("Mob summary: " + initialMobState + " => " + fixedMobState);
} else {
out.print("Summary: " + initialState);
out.print("Mob summary: " + initialMobState);
}
}
}

View File

@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -95,6 +96,8 @@ import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -437,6 +440,31 @@ public class TestHBaseFsck {
tbl.flushCommits();
}
/**
* Setup a clean table with a mob-enabled column.
*
* @param tableName The name of a table to be created.
* @throws Exception
*/
void setupMobTable(TableName tablename) throws Exception {
HTableDescriptor desc = new HTableDescriptor(tablename);
HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toString(FAM));
hcd.setMobEnabled(true);
hcd.setMobThreshold(0);
desc.addFamily(hcd); // If a table has no CF's it doesn't get checked
createTable(TEST_UTIL, desc, SPLITS);
tbl = (HTable) connection.getTable(tablename, tableExecutorService);
List<Put> puts = new ArrayList<Put>();
for (byte[] row : ROWKEYS) {
Put p = new Put(row);
p.add(FAM, Bytes.toBytes("val"), row);
puts.add(p);
}
tbl.put(puts);
tbl.flushCommits();
}
/**
* Counts the number of row to verify data loss or non-dataloss.
*/
@ -2120,6 +2148,44 @@ public class TestHBaseFsck {
}
}
/**
* Gets flushed mob files.
* @param fs The current file system.
* @param table The current table name.
* @return Path of a flushed hfile.
* @throws IOException
*/
Path getFlushedMobFile(FileSystem fs, TableName table) throws IOException {
Path regionDir = MobUtils.getMobRegionPath(conf, table);
Path famDir = new Path(regionDir, FAM_STR);
// keep doing this until we get a legit hfile
while (true) {
FileStatus[] hfFss = fs.listStatus(famDir);
if (hfFss.length == 0) {
continue;
}
for (FileStatus hfs : hfFss) {
if (!hfs.isDirectory()) {
return hfs.getPath();
}
}
}
}
/**
* Creates a new mob file name by the old one.
* @param oldFileName The old mob file name.
* @return The new mob file name.
*/
String createMobFileName(String oldFileName) {
MobFileName mobFileName = MobFileName.create(oldFileName);
String startKey = mobFileName.getStartKey();
String date = mobFileName.getDate();
return MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", ""))
.getFileName();
}
/**
* This creates a table and then corrupts an hfile. Hbck should quarantine the file.
*/
@ -2160,6 +2226,50 @@ public class TestHBaseFsck {
}
}
/**
* This creates a table and then corrupts a mob file. Hbck should quarantine the file.
*/
@Test(timeout=180000)
public void testQuarantineCorruptMobFile() throws Exception {
TableName table = TableName.valueOf(name.getMethodName());
try {
setupMobTable(table);
assertEquals(ROWKEYS.length, countRows());
admin.flush(table);
FileSystem fs = FileSystem.get(conf);
Path mobFile = getFlushedMobFile(fs, table);
admin.disableTable(table);
// create new corrupt mob file.
String corruptMobFile = createMobFileName(mobFile.getName());
Path corrupt = new Path(mobFile.getParent(), corruptMobFile);
TestHFile.truncateFile(fs, mobFile, corrupt);
LOG.info("Created corrupted mob file " + corrupt);
HBaseFsck.debugLsr(conf, FSUtils.getRootDir(conf));
HBaseFsck.debugLsr(conf, MobUtils.getMobHome(conf));
// A corrupt mob file doesn't abort the start of regions, so we can enable the table.
admin.enableTable(table);
HBaseFsck res = HbckTestingUtil.doHFileQuarantine(conf, table);
assertEquals(res.getRetCode(), 0);
HFileCorruptionChecker hfcc = res.getHFilecorruptionChecker();
assertEquals(hfcc.getHFilesChecked(), 4);
assertEquals(hfcc.getCorrupted().size(), 0);
assertEquals(hfcc.getFailures().size(), 0);
assertEquals(hfcc.getQuarantined().size(), 0);
assertEquals(hfcc.getMissing().size(), 0);
assertEquals(hfcc.getMobFilesChecked(), 5);
assertEquals(hfcc.getCorruptedMobFiles().size(), 1);
assertEquals(hfcc.getFailureMobFiles().size(), 0);
assertEquals(hfcc.getQuarantinedMobFiles().size(), 1);
assertEquals(hfcc.getMissedMobFiles().size(), 0);
String quarantinedMobFile = hfcc.getQuarantinedMobFiles().iterator().next().getName();
assertEquals(corruptMobFile, quarantinedMobFile);
} finally {
cleanupTable(table);
}
}
/**
* Test that use this should have a timeout, because this method could potentially wait forever.
*/