HBASE-12891 Parallel execution for Hbck checkRegionConsistency

Signed-off-by: Enis Soztutar <enis@apache.org>
This commit is contained in:
Dave Latham 2015-04-02 08:32:15 -07:00 committed by Enis Soztutar
parent 926aaed113
commit b8e969be7a
1 changed files with 78 additions and 14 deletions

View File

@ -56,8 +56,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.collect.TreeMultimap; import com.google.common.collect.TreeMultimap;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -1767,23 +1769,75 @@ public class HBaseFsck extends Configured implements Closeable {
throws IOException, KeeperException, InterruptedException { throws IOException, KeeperException, InterruptedException {
// Divide the checks in two phases. One for default/primary replicas and another // Divide the checks in two phases. One for default/primary replicas and another
// for the non-primary ones. Keeps code cleaner this way. // for the non-primary ones. Keeps code cleaner this way.
List<CheckRegionConsistencyWorkItem> workItems =
new ArrayList<CheckRegionConsistencyWorkItem>(regionInfoMap.size());
for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) { for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) {
if (e.getValue().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { if (e.getValue().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
checkRegionConsistency(e.getKey(), e.getValue()); workItems.add(new CheckRegionConsistencyWorkItem(e.getKey(), e.getValue()));
} }
} }
checkRegionConsistencyConcurrently(workItems);
boolean prevHdfsCheck = shouldCheckHdfs(); boolean prevHdfsCheck = shouldCheckHdfs();
setCheckHdfs(false); //replicas don't have any hdfs data setCheckHdfs(false); //replicas don't have any hdfs data
// Run a pass over the replicas and fix any assignment issues that exist on the currently // Run a pass over the replicas and fix any assignment issues that exist on the currently
// deployed/undeployed replicas. // deployed/undeployed replicas.
List<CheckRegionConsistencyWorkItem> replicaWorkItems =
new ArrayList<CheckRegionConsistencyWorkItem>(regionInfoMap.size());
for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) { for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) {
if (e.getValue().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { if (e.getValue().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
checkRegionConsistency(e.getKey(), e.getValue()); replicaWorkItems.add(new CheckRegionConsistencyWorkItem(e.getKey(), e.getValue()));
} }
} }
checkRegionConsistencyConcurrently(replicaWorkItems);
setCheckHdfs(prevHdfsCheck); setCheckHdfs(prevHdfsCheck);
} }
/**
* Check consistency of all regions using mulitple threads concurrently.
*/
private void checkRegionConsistencyConcurrently(
final List<CheckRegionConsistencyWorkItem> workItems)
throws IOException, KeeperException, InterruptedException {
if (workItems.isEmpty()) {
return; // nothing to check
}
List<Future<Void>> workFutures = executor.invokeAll(workItems);
for(Future<Void> f: workFutures) {
try {
f.get();
} catch(ExecutionException e1) {
LOG.warn("Could not check region consistency " , e1.getCause());
if (e1.getCause() instanceof IOException) {
throw (IOException)e1.getCause();
} else if (e1.getCause() instanceof KeeperException) {
throw (KeeperException)e1.getCause();
} else if (e1.getCause() instanceof InterruptedException) {
throw (InterruptedException)e1.getCause();
} else {
throw new IOException(e1.getCause());
}
}
}
}
class CheckRegionConsistencyWorkItem implements Callable<Void> {
private final String key;
private final HbckInfo hbi;
CheckRegionConsistencyWorkItem(String key, HbckInfo hbi) {
this.key = key;
this.hbi = hbi;
}
@Override
public synchronized Void call() throws Exception {
checkRegionConsistency(key, hbi);
return null;
}
}
private void preCheckPermission() throws IOException, AccessDeniedException { private void preCheckPermission() throws IOException, AccessDeniedException {
if (shouldIgnorePreCheckPermission()) { if (shouldIgnorePreCheckPermission()) {
return; return;
@ -2079,16 +2133,8 @@ public class HBaseFsck extends Configured implements Closeable {
HRegionInfo hri = hbi.getHdfsHRI(); HRegionInfo hri = hbi.getHdfsHRI();
TableInfo tableInfo = tablesInfo.get(hri.getTable()); TableInfo tableInfo = tablesInfo.get(hri.getTable());
if (tableInfo.regionsFromMeta.isEmpty()) {
for (HbckInfo h : regionInfoMap.values()) { for (HRegionInfo region : tableInfo.getRegionsFromMeta()) {
if (hri.getTable().equals(h.getTableName())) {
if (h.metaEntry != null) tableInfo.regionsFromMeta
.add((HRegionInfo) h.metaEntry);
}
}
Collections.sort(tableInfo.regionsFromMeta);
}
for (HRegionInfo region : tableInfo.regionsFromMeta) {
if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) <= 0 if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) <= 0
&& (region.getEndKey().length == 0 || Bytes.compareTo(region.getEndKey(), && (region.getEndKey().length == 0 || Bytes.compareTo(region.getEndKey(),
hri.getEndKey()) >= 0) hri.getEndKey()) >= 0)
@ -2445,7 +2491,7 @@ public class HBaseFsck extends Configured implements Closeable {
TreeMultimap.create(RegionSplitCalculator.BYTES_COMPARATOR, cmp); TreeMultimap.create(RegionSplitCalculator.BYTES_COMPARATOR, cmp);
// list of regions derived from meta entries. // list of regions derived from meta entries.
final List<HRegionInfo> regionsFromMeta = new ArrayList<HRegionInfo>(); private ImmutableList<HRegionInfo> regionsFromMeta = null;
TableInfo(TableName name) { TableInfo(TableName name) {
this.tableName = name; this.tableName = name;
@ -2502,6 +2548,24 @@ public class HBaseFsck extends Configured implements Closeable {
return sc.getStarts().size() + backwards.size(); return sc.getStarts().size() + backwards.size();
} }
public synchronized ImmutableList<HRegionInfo> getRegionsFromMeta() {
// lazy loaded, synchronized to ensure a single load
if (regionsFromMeta == null) {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
for (HbckInfo h : HBaseFsck.this.regionInfoMap.values()) {
if (tableName.equals(h.getTableName())) {
if (h.metaEntry != null) {
regions.add((HRegionInfo) h.metaEntry);
}
}
}
regionsFromMeta = Ordering.natural().immutableSortedCopy(regions);
}
return regionsFromMeta;
}
private class IntegrityFixSuggester extends TableIntegrityErrorHandlerImpl { private class IntegrityFixSuggester extends TableIntegrityErrorHandlerImpl {
ErrorReporter errors; ErrorReporter errors;