HBASE-10401 [hbck] perform overlap group merges in parallel

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1560774 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2014-01-23 18:10:52 +00:00
parent 5cd2a757f3
commit 8b49f4297c

View File

@ -183,7 +183,8 @@ public class HBaseFsck extends Configured {
private HConnection connection; private HConnection connection;
private HBaseAdmin admin; private HBaseAdmin admin;
private HTable meta; private HTable meta;
protected ExecutorService executor; // threads to retrieve data from regionservers // threads to do ||izable tasks: retrieve data from regionservers, handle overlapping regions
protected ExecutorService executor;
private long startMillis = System.currentTimeMillis(); private long startMillis = System.currentTimeMillis();
private HFileCorruptionChecker hfcc; private HFileCorruptionChecker hfcc;
private int retcode = 0; private int retcode = 0;
@ -2006,8 +2007,8 @@ public class HBaseFsck extends Configured {
*/ */
public int mergeRegionDirs(Path targetRegionDir, HbckInfo contained) throws IOException { public int mergeRegionDirs(Path targetRegionDir, HbckInfo contained) throws IOException {
int fileMoves = 0; int fileMoves = 0;
String thread = Thread.currentThread().getName();
LOG.debug("Contained region dir after close and pause"); LOG.debug("[" + thread + "] Contained region dir after close and pause");
debugLsr(contained.getHdfsRegionDir()); debugLsr(contained.getHdfsRegionDir());
// rename the contained into the container. // rename the contained into the container.
@ -2019,8 +2020,8 @@ public class HBaseFsck extends Configured {
// region we are attempting to merge in is not present! Since this is a merge, there is // region we are attempting to merge in is not present! Since this is a merge, there is
// no harm skipping this region if it does not exist. // no harm skipping this region if it does not exist.
if (!fs.exists(contained.getHdfsRegionDir())) { if (!fs.exists(contained.getHdfsRegionDir())) {
LOG.warn("HDFS region dir " + contained.getHdfsRegionDir() + " is missing. " + LOG.warn("[" + thread + "] HDFS region dir " + contained.getHdfsRegionDir()
"Assuming already sidelined or moved."); + " is missing. Assuming already sidelined or moved.");
} else { } else {
sidelineRegionDir(fs, contained); sidelineRegionDir(fs, contained);
} }
@ -2029,7 +2030,8 @@ public class HBaseFsck extends Configured {
if (dirs == null) { if (dirs == null) {
if (!fs.exists(contained.getHdfsRegionDir())) { if (!fs.exists(contained.getHdfsRegionDir())) {
LOG.warn("HDFS region dir " + contained.getHdfsRegionDir() + " already sidelined."); LOG.warn("[" + thread + "] HDFS region dir " + contained.getHdfsRegionDir()
+ " already sidelined.");
} else { } else {
sidelineRegionDir(fs, contained); sidelineRegionDir(fs, contained);
} }
@ -2050,7 +2052,7 @@ public class HBaseFsck extends Configured {
continue; continue;
} }
LOG.info("Moving files from " + src + " into containing region " + dst); LOG.info("[" + thread + "] Moving files from " + src + " into containing region " + dst);
// FileSystem.rename is inconsistent with directories -- if the // FileSystem.rename is inconsistent with directories -- if the
// dst (foo/a) exists and is a dir, and the src (foo/b) is a dir, // dst (foo/a) exists and is a dir, and the src (foo/b) is a dir,
// it moves the src into the dst dir resulting in (foo/a/b). If // it moves the src into the dst dir resulting in (foo/a/b). If
@ -2061,19 +2063,37 @@ public class HBaseFsck extends Configured {
fileMoves++; fileMoves++;
} }
} }
LOG.debug("Sideline directory contents:"); LOG.debug("[" + thread + "] Sideline directory contents:");
debugLsr(targetRegionDir); debugLsr(targetRegionDir);
} }
// if all success. // if all success.
sidelineRegionDir(fs, contained); sidelineRegionDir(fs, contained);
LOG.info("Sidelined region dir "+ contained.getHdfsRegionDir() + " into " + LOG.info("[" + thread + "] Sidelined region dir "+ contained.getHdfsRegionDir() + " into " +
getSidelineDir()); getSidelineDir());
debugLsr(contained.getHdfsRegionDir()); debugLsr(contained.getHdfsRegionDir());
return fileMoves; return fileMoves;
} }
static class WorkItemOverlapMerge implements Callable<Void> {
private TableIntegrityErrorHandler handler;
Collection<HbckInfo> overlapgroup;
WorkItemOverlapMerge(Collection<HbckInfo> overlapgroup, TableIntegrityErrorHandler handler) {
this.handler = handler;
this.overlapgroup = overlapgroup;
}
@Override
public Void call() throws Exception {
handler.handleOverlapGroup(overlapgroup);
return null;
}
};
/** /**
* Maintain information about a particular table. * Maintain information about a particular table.
*/ */
@ -2302,6 +2322,8 @@ public class HBaseFsck extends Configured {
* Cases: * Cases:
* - Clean regions that overlap * - Clean regions that overlap
* - Only .oldlogs regions (can't find start/stop range, or figure out) * - Only .oldlogs regions (can't find start/stop range, or figure out)
*
* This is basically threadsafe, except for the fixer increment in mergeOverlaps.
*/ */
@Override @Override
public void handleOverlapGroup(Collection<HbckInfo> overlap) public void handleOverlapGroup(Collection<HbckInfo> overlap)
@ -2329,7 +2351,8 @@ public class HBaseFsck extends Configured {
void mergeOverlaps(Collection<HbckInfo> overlap) void mergeOverlaps(Collection<HbckInfo> overlap)
throws IOException { throws IOException {
LOG.info("== Merging regions into one region: " String thread = Thread.currentThread().getName();
LOG.info("== [" + thread + "] Merging regions into one region: "
+ Joiner.on(",").join(overlap)); + Joiner.on(",").join(overlap));
// get the min / max range and close all concerned regions // get the min / max range and close all concerned regions
Pair<byte[], byte[]> range = null; Pair<byte[], byte[]> range = null;
@ -2347,25 +2370,25 @@ public class HBaseFsck extends Configured {
} }
} }
// need to close files so delete can happen. // need to close files so delete can happen.
LOG.debug("Closing region before moving data around: " + hi); LOG.debug("[" + thread + "] Closing region before moving data around: " + hi);
LOG.debug("Contained region dir before close"); LOG.debug("[" + thread + "] Contained region dir before close");
debugLsr(hi.getHdfsRegionDir()); debugLsr(hi.getHdfsRegionDir());
try { try {
LOG.info("Closing region: " + hi); LOG.info("[" + thread + "] Closing region: " + hi);
closeRegion(hi); closeRegion(hi);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Was unable to close region " + hi LOG.warn("[" + thread + "] Was unable to close region " + hi
+ ". Just continuing... ", ioe); + ". Just continuing... ", ioe);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Was unable to close region " + hi LOG.warn("[" + thread + "] Was unable to close region " + hi
+ ". Just continuing... ", e); + ". Just continuing... ", e);
} }
try { try {
LOG.info("Offlining region: " + hi); LOG.info("[" + thread + "] Offlining region: " + hi);
offline(hi.getRegionName()); offline(hi.getRegionName());
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Unable to offline region from master: " + hi LOG.warn("[" + thread + "] Unable to offline region from master: " + hi
+ ". Just continuing... ", ioe); + ". Just continuing... ", ioe);
} }
} }
@ -2376,7 +2399,7 @@ public class HBaseFsck extends Configured {
HRegionInfo newRegion = new HRegionInfo(htd.getTableName(), range.getFirst(), HRegionInfo newRegion = new HRegionInfo(htd.getTableName(), range.getFirst(),
range.getSecond()); range.getSecond());
HRegion region = HBaseFsckRepair.createHDFSRegionDir(conf, newRegion, htd); HRegion region = HBaseFsckRepair.createHDFSRegionDir(conf, newRegion, htd);
LOG.info("Created new empty container region: " + LOG.info("[" + thread + "] Created new empty container region: " +
newRegion + " to contain regions: " + Joiner.on(",").join(overlap)); newRegion + " to contain regions: " + Joiner.on(",").join(overlap));
debugLsr(region.getRegionFileSystem().getRegionDir()); debugLsr(region.getRegionFileSystem().getRegionDir());
@ -2384,7 +2407,7 @@ public class HBaseFsck extends Configured {
boolean didFix= false; boolean didFix= false;
Path target = region.getRegionFileSystem().getRegionDir(); Path target = region.getRegionFileSystem().getRegionDir();
for (HbckInfo contained : overlap) { for (HbckInfo contained : overlap) {
LOG.info("Merging " + contained + " into " + target ); LOG.info("[" + thread + "] Merging " + contained + " into " + target );
int merges = mergeRegionDirs(target, contained); int merges = mergeRegionDirs(target, contained);
if (merges > 0) { if (merges > 0) {
didFix = true; didFix = true;
@ -2540,8 +2563,20 @@ public class HBaseFsck extends Configured {
handler.handleRegionEndKeyNotEmpty(prevKey); handler.handleRegionEndKeyNotEmpty(prevKey);
} }
for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) { // TODO fold this into the TableIntegrityHandler
handler.handleOverlapGroup(overlap); if (getConf().getBoolean("hbasefsck.overlap.merge.parallel", true)) {
LOG.info("Handling overlap merges in parallel. set hbasefsck.overlap.merge.parallel to" +
" false to run serially.");
boolean ok = handleOverlapsParallel(handler, prevKey);
if (!ok) {
return false;
}
} else {
LOG.info("Handling overlap merges serially. set hbasefsck.overlap.merge.parallel to" +
" true to run in parallel.");
for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) {
handler.handleOverlapGroup(overlap);
}
} }
if (details) { if (details) {
@ -2565,6 +2600,38 @@ public class HBaseFsck extends Configured {
return errors.getErrorList().size() == originalErrorsCount; return errors.getErrorList().size() == originalErrorsCount;
} }
private boolean handleOverlapsParallel(TableIntegrityErrorHandler handler, byte[] prevKey)
throws IOException {
// we parallelize overlap handler for the case we have lots of groups to fix. We can
// safely assume each group is independent.
List<WorkItemOverlapMerge> merges = new ArrayList<WorkItemOverlapMerge>(overlapGroups.size());
List<Future<Void>> rets;
for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) {
//
merges.add(new WorkItemOverlapMerge(overlap, handler));
}
try {
rets = executor.invokeAll(merges);
} catch (InterruptedException e) {
e.printStackTrace();
LOG.error("Overlap merges were interrupted", e);
return false;
}
for(int i=0; i<merges.size(); i++) {
WorkItemOverlapMerge work = merges.get(i);
Future<Void> f = rets.get(i);
try {
f.get();
} catch(ExecutionException e) {
LOG.warn("Failed to merge overlap group" + work, e.getCause());
} catch (InterruptedException e) {
LOG.error("Waiting for overlap merges was interrupted", e);
return false;
}
}
return true;
}
/** /**
* This dumps data in a visually reasonable way for visual debugging * This dumps data in a visually reasonable way for visual debugging
* *