HBASE-12891 Parallel execution for Hbck checkRegionConsistency
Signed-off-by: Enis Soztutar <enis@apache.org>
This commit is contained in:
parent
1e6ea5aab7
commit
61f4ce6880
|
@ -56,8 +56,10 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Multimap;
|
import com.google.common.collect.Multimap;
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
import com.google.common.collect.TreeMultimap;
|
import com.google.common.collect.TreeMultimap;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
@ -1767,20 +1769,28 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
throws IOException, KeeperException, InterruptedException {
|
throws IOException, KeeperException, InterruptedException {
|
||||||
// Divide the checks in two phases. One for default/primary replicas and another
|
// Divide the checks in two phases. One for default/primary replicas and another
|
||||||
// for the non-primary ones. Keeps code cleaner this way.
|
// for the non-primary ones. Keeps code cleaner this way.
|
||||||
|
|
||||||
|
List<CheckRegionConsistencyWorkItem> workItems =
|
||||||
|
new ArrayList<CheckRegionConsistencyWorkItem>(regionInfoMap.size());
|
||||||
for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) {
|
for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) {
|
||||||
if (e.getValue().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
|
if (e.getValue().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
|
||||||
checkRegionConsistency(e.getKey(), e.getValue());
|
workItems.add(new CheckRegionConsistencyWorkItem(e.getKey(), e.getValue()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
checkRegionConsistencyConcurrently(workItems);
|
||||||
|
|
||||||
boolean prevHdfsCheck = shouldCheckHdfs();
|
boolean prevHdfsCheck = shouldCheckHdfs();
|
||||||
setCheckHdfs(false); //replicas don't have any hdfs data
|
setCheckHdfs(false); //replicas don't have any hdfs data
|
||||||
// Run a pass over the replicas and fix any assignment issues that exist on the currently
|
// Run a pass over the replicas and fix any assignment issues that exist on the currently
|
||||||
// deployed/undeployed replicas.
|
// deployed/undeployed replicas.
|
||||||
|
List<CheckRegionConsistencyWorkItem> replicaWorkItems =
|
||||||
|
new ArrayList<CheckRegionConsistencyWorkItem>(regionInfoMap.size());
|
||||||
for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) {
|
for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) {
|
||||||
if (e.getValue().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
|
if (e.getValue().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
|
||||||
checkRegionConsistency(e.getKey(), e.getValue());
|
replicaWorkItems.add(new CheckRegionConsistencyWorkItem(e.getKey(), e.getValue()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
checkRegionConsistencyConcurrently(replicaWorkItems);
|
||||||
setCheckHdfs(prevHdfsCheck);
|
setCheckHdfs(prevHdfsCheck);
|
||||||
|
|
||||||
if (shouldCheckHdfs()) {
|
if (shouldCheckHdfs()) {
|
||||||
|
@ -1788,6 +1798,51 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check consistency of all regions using mulitple threads concurrently.
|
||||||
|
*/
|
||||||
|
private void checkRegionConsistencyConcurrently(
|
||||||
|
final List<CheckRegionConsistencyWorkItem> workItems)
|
||||||
|
throws IOException, KeeperException, InterruptedException {
|
||||||
|
if (workItems.isEmpty()) {
|
||||||
|
return; // nothing to check
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Future<Void>> workFutures = executor.invokeAll(workItems);
|
||||||
|
for(Future<Void> f: workFutures) {
|
||||||
|
try {
|
||||||
|
f.get();
|
||||||
|
} catch(ExecutionException e1) {
|
||||||
|
LOG.warn("Could not check region consistency " , e1.getCause());
|
||||||
|
if (e1.getCause() instanceof IOException) {
|
||||||
|
throw (IOException)e1.getCause();
|
||||||
|
} else if (e1.getCause() instanceof KeeperException) {
|
||||||
|
throw (KeeperException)e1.getCause();
|
||||||
|
} else if (e1.getCause() instanceof InterruptedException) {
|
||||||
|
throw (InterruptedException)e1.getCause();
|
||||||
|
} else {
|
||||||
|
throw new IOException(e1.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class CheckRegionConsistencyWorkItem implements Callable<Void> {
|
||||||
|
private final String key;
|
||||||
|
private final HbckInfo hbi;
|
||||||
|
|
||||||
|
CheckRegionConsistencyWorkItem(String key, HbckInfo hbi) {
|
||||||
|
this.key = key;
|
||||||
|
this.hbi = hbi;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized Void call() throws Exception {
|
||||||
|
checkRegionConsistency(key, hbi);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check and fix table states, assumes full info available:
|
* Check and fix table states, assumes full info available:
|
||||||
* - tableInfos
|
* - tableInfos
|
||||||
|
@ -2128,16 +2183,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
|
|
||||||
HRegionInfo hri = hbi.getHdfsHRI();
|
HRegionInfo hri = hbi.getHdfsHRI();
|
||||||
TableInfo tableInfo = tablesInfo.get(hri.getTable());
|
TableInfo tableInfo = tablesInfo.get(hri.getTable());
|
||||||
if (tableInfo.regionsFromMeta.isEmpty()) {
|
|
||||||
for (HbckInfo h : regionInfoMap.values()) {
|
for (HRegionInfo region : tableInfo.getRegionsFromMeta()) {
|
||||||
if (hri.getTable().equals(h.getTableName())) {
|
|
||||||
if (h.metaEntry != null) tableInfo.regionsFromMeta
|
|
||||||
.add((HRegionInfo) h.metaEntry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Collections.sort(tableInfo.regionsFromMeta);
|
|
||||||
}
|
|
||||||
for (HRegionInfo region : tableInfo.regionsFromMeta) {
|
|
||||||
if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) <= 0
|
if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) <= 0
|
||||||
&& (region.getEndKey().length == 0 || Bytes.compareTo(region.getEndKey(),
|
&& (region.getEndKey().length == 0 || Bytes.compareTo(region.getEndKey(),
|
||||||
hri.getEndKey()) >= 0)
|
hri.getEndKey()) >= 0)
|
||||||
|
@ -2493,7 +2540,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
TreeMultimap.create(RegionSplitCalculator.BYTES_COMPARATOR, cmp);
|
TreeMultimap.create(RegionSplitCalculator.BYTES_COMPARATOR, cmp);
|
||||||
|
|
||||||
// list of regions derived from meta entries.
|
// list of regions derived from meta entries.
|
||||||
final List<HRegionInfo> regionsFromMeta = new ArrayList<HRegionInfo>();
|
private ImmutableList<HRegionInfo> regionsFromMeta = null;
|
||||||
|
|
||||||
TableInfo(TableName name) {
|
TableInfo(TableName name) {
|
||||||
this.tableName = name;
|
this.tableName = name;
|
||||||
|
@ -2550,6 +2597,23 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
return sc.getStarts().size() + backwards.size();
|
return sc.getStarts().size() + backwards.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized ImmutableList<HRegionInfo> getRegionsFromMeta() {
|
||||||
|
// lazy loaded, synchronized to ensure a single load
|
||||||
|
if (regionsFromMeta == null) {
|
||||||
|
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
|
||||||
|
for (HbckInfo h : HBaseFsck.this.regionInfoMap.values()) {
|
||||||
|
if (tableName.equals(h.getTableName())) {
|
||||||
|
if (h.metaEntry != null) {
|
||||||
|
regions.add((HRegionInfo) h.metaEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
regionsFromMeta = Ordering.natural().immutableSortedCopy(regions);
|
||||||
|
}
|
||||||
|
|
||||||
|
return regionsFromMeta;
|
||||||
|
}
|
||||||
|
|
||||||
private class IntegrityFixSuggester extends TableIntegrityErrorHandlerImpl {
|
private class IntegrityFixSuggester extends TableIntegrityErrorHandlerImpl {
|
||||||
ErrorReporter errors;
|
ErrorReporter errors;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue