HBASE-5712 Parallelize load of .regioninfo files in diagnostic/repair portion of hbck
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1332072 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b695b64dfa
commit
87899ccda8
|
@ -28,9 +28,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -201,12 +203,12 @@ public class HBaseFsck {
|
|||
* detect table consistency problems (holes, dupes, overlaps). It is sorted
|
||||
* to prevent dupes.
|
||||
*/
|
||||
private TreeMap<String, TableInfo> tablesInfo = new TreeMap<String, TableInfo>();
|
||||
private SortedMap<String, TableInfo> tablesInfo = new ConcurrentSkipListMap<String,TableInfo>();
|
||||
|
||||
/**
|
||||
* When initially looking at HDFS, we attempt to find any orphaned data.
|
||||
*/
|
||||
private List<HbckInfo> orphanHdfsDirs = new ArrayList<HbckInfo>();
|
||||
private List<HbckInfo> orphanHdfsDirs = Collections.synchronizedList(new ArrayList<HbckInfo>());
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -414,6 +416,11 @@ public class HBaseFsck {
|
|||
Path p = hi.getHdfsRegionDir();
|
||||
FileSystem fs = p.getFileSystem(conf);
|
||||
FileStatus[] dirs = fs.listStatus(p);
|
||||
if (dirs == null) {
|
||||
LOG.warn("Attempt to adopt ophan hdfs region skipped becuase no files present in " +
|
||||
p + ". This dir could probably be deleted.");
|
||||
return ;
|
||||
}
|
||||
|
||||
String tableName = Bytes.toString(hi.getTableName());
|
||||
TableInfo tableInfo = tablesInfo.get(tableName);
|
||||
|
@ -585,6 +592,12 @@ public class HBaseFsck {
|
|||
LOG.warn("No HDFS region dir found: " + hbi + " meta=" + hbi.metaEntry);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hbi.hdfsEntry.hri != null) {
|
||||
// already loaded data
|
||||
return;
|
||||
}
|
||||
|
||||
Path regioninfo = new Path(regionDir, HRegion.REGIONINFO_FILE);
|
||||
FileSystem fs = regioninfo.getFileSystem(conf);
|
||||
|
||||
|
@ -612,27 +625,37 @@ public class HBaseFsck {
|
|||
/**
|
||||
* Populate hbi's from regionInfos loaded from file system.
|
||||
*/
|
||||
private TreeMap<String, TableInfo> loadHdfsRegionInfos() throws IOException {
|
||||
private SortedMap<String, TableInfo> loadHdfsRegionInfos() throws IOException, InterruptedException {
|
||||
tablesInfo.clear(); // regenerating the data
|
||||
// generate region split structure
|
||||
for (HbckInfo hbi : regionInfoMap.values()) {
|
||||
Collection<HbckInfo> hbckInfos = regionInfoMap.values();
|
||||
|
||||
// only load entries that haven't been loaded yet.
|
||||
if (hbi.getHdfsHRI() == null) {
|
||||
try {
|
||||
loadHdfsRegioninfo(hbi);
|
||||
} catch (IOException ioe) {
|
||||
String msg = "Orphan region in HDFS: Unable to load .regioninfo from table "
|
||||
+ Bytes.toString(hbi.getTableName()) + " in hdfs dir "
|
||||
+ hbi.getHdfsRegionDir()
|
||||
+ "! It may be an invalid format or version file. Treating as "
|
||||
+ "an orphaned regiondir.";
|
||||
errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg);
|
||||
debugLsr(hbi.getHdfsRegionDir());
|
||||
orphanHdfsDirs.add(hbi);
|
||||
continue;
|
||||
// Parallelized read of .regioninfo files.
|
||||
WorkItemHdfsRegionInfo[] hbis = new WorkItemHdfsRegionInfo[hbckInfos.size()];
|
||||
int num = 0;
|
||||
for (HbckInfo hbi : hbckInfos) {
|
||||
hbis[num] = new WorkItemHdfsRegionInfo(hbi, this, errors);
|
||||
executor.execute(hbis[num]);
|
||||
num++;
|
||||
}
|
||||
|
||||
for (int i=0; i < num; i++) {
|
||||
WorkItemHdfsRegionInfo hbi = hbis[i];
|
||||
synchronized(hbi) {
|
||||
while (!hbi.isDone()) {
|
||||
hbi.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serialized table info gathering.
|
||||
for (HbckInfo hbi: hbckInfos) {
|
||||
|
||||
if (hbi.getHdfsHRI() == null) {
|
||||
// was an orphan
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
// get table name from hdfs, populate various HBaseFsck tables.
|
||||
String tableName = Bytes.toString(hbi.getTableName());
|
||||
|
@ -647,10 +670,16 @@ public class HBaseFsck {
|
|||
// only executed once per table.
|
||||
modTInfo = new TableInfo(tableName);
|
||||
Path hbaseRoot = new Path(conf.get(HConstants.HBASE_DIR));
|
||||
HTableDescriptor htd =
|
||||
FSTableDescriptors.getTableDescriptor(hbaseRoot.getFileSystem(conf),
|
||||
try {
|
||||
HTableDescriptor htd =
|
||||
FSTableDescriptors.getTableDescriptor(hbaseRoot.getFileSystem(conf),
|
||||
hbaseRoot, tableName);
|
||||
modTInfo.htds.add(htd);
|
||||
modTInfo.htds.add(htd);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Unable to read .tableinfo from " + hbaseRoot, ioe);
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
}
|
||||
modTInfo.addRegionInfo(hbi);
|
||||
tablesInfo.put(tableName, modTInfo);
|
||||
|
@ -691,7 +720,7 @@ public class HBaseFsck {
|
|||
*
|
||||
* @return An array list of puts to do in bulk, null if tables have problems
|
||||
*/
|
||||
private ArrayList<Put> generatePuts(TreeMap<String, TableInfo> tablesInfo) throws IOException {
|
||||
private ArrayList<Put> generatePuts(SortedMap<String, TableInfo> tablesInfo) throws IOException {
|
||||
ArrayList<Put> puts = new ArrayList<Put>();
|
||||
boolean hasProblems = false;
|
||||
for (Entry<String, TableInfo> e : tablesInfo.entrySet()) {
|
||||
|
@ -731,7 +760,7 @@ public class HBaseFsck {
|
|||
/**
|
||||
* Suggest fixes for each table
|
||||
*/
|
||||
private void suggestFixes(TreeMap<String, TableInfo> tablesInfo) throws IOException {
|
||||
private void suggestFixes(SortedMap<String, TableInfo> tablesInfo) throws IOException {
|
||||
for (TableInfo tInfo : tablesInfo.values()) {
|
||||
TableIntegrityErrorHandler handler = tInfo.new IntegrityFixSuggester(tInfo, errors);
|
||||
tInfo.checkRegionChain(handler);
|
||||
|
@ -802,7 +831,7 @@ public class HBaseFsck {
|
|||
return true;
|
||||
}
|
||||
|
||||
private TreeMap<String, TableInfo> checkHdfsIntegrity(boolean fixHoles,
|
||||
private SortedMap<String, TableInfo> checkHdfsIntegrity(boolean fixHoles,
|
||||
boolean fixOverlaps) throws IOException {
|
||||
LOG.info("Checking HBase region split map from HDFS data...");
|
||||
for (TableInfo tInfo : tablesInfo.values()) {
|
||||
|
@ -1428,7 +1457,7 @@ public class HBaseFsck {
|
|||
* repeated or overlapping ones.
|
||||
* @throws IOException
|
||||
*/
|
||||
TreeMap<String, TableInfo> checkIntegrity() throws IOException {
|
||||
SortedMap<String, TableInfo> checkIntegrity() throws IOException {
|
||||
tablesInfo = new TreeMap<String,TableInfo> ();
|
||||
List<HbckInfo> noHDFSRegionInfos = new ArrayList<HbckInfo>();
|
||||
LOG.debug("There are " + regionInfoMap.size() + " region info entries");
|
||||
|
@ -2448,7 +2477,7 @@ public class HBaseFsck {
|
|||
/**
|
||||
* Prints summary of all tables found on the system.
|
||||
*/
|
||||
private void printTableSummary(TreeMap<String, TableInfo> tablesInfo) {
|
||||
private void printTableSummary(SortedMap<String, TableInfo> tablesInfo) {
|
||||
System.out.println("Summary:");
|
||||
for (TableInfo tInfo : tablesInfo.values()) {
|
||||
if (errors.tableHasErrors(tInfo)) {
|
||||
|
@ -2747,6 +2776,58 @@ public class HBaseFsck {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Contact hdfs and get all information about specified table directory into
|
||||
* regioninfo list.
|
||||
*/
|
||||
static class WorkItemHdfsRegionInfo implements Runnable {
|
||||
private HbckInfo hbi;
|
||||
private HBaseFsck hbck;
|
||||
private ErrorReporter errors;
|
||||
private boolean done;
|
||||
|
||||
WorkItemHdfsRegionInfo(HbckInfo hbi, HBaseFsck hbck, ErrorReporter errors) {
|
||||
this.hbi = hbi;
|
||||
this.hbck = hbck;
|
||||
this.errors = errors;
|
||||
this.done = false;
|
||||
}
|
||||
|
||||
synchronized boolean isDone() {
|
||||
return done;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void run() {
|
||||
try {
|
||||
// only load entries that haven't been loaded yet.
|
||||
if (hbi.getHdfsHRI() == null) {
|
||||
try {
|
||||
hbck.loadHdfsRegioninfo(hbi);
|
||||
} catch (IOException ioe) {
|
||||
String msg = "Orphan region in HDFS: Unable to load .regioninfo from table "
|
||||
+ Bytes.toString(hbi.getTableName()) + " in hdfs dir "
|
||||
+ hbi.getHdfsRegionDir()
|
||||
+ "! It may be an invalid format or version file. Treating as "
|
||||
+ "an orphaned regiondir.";
|
||||
errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg);
|
||||
try {
|
||||
hbck.debugLsr(hbi.getHdfsRegionDir());
|
||||
} catch (IOException ioe2) {
|
||||
LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2);
|
||||
return; // TODO convert this in to a future
|
||||
}
|
||||
hbck.orphanHdfsDirs.add(hbi);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
done = true;
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Display the full report from fsck. This displays all live and dead region
|
||||
* servers, and all known regions.
|
||||
|
|
|
@ -879,10 +879,10 @@ public class TestHBaseFsck {
|
|||
* the region is not deployed when the table is disabled.
|
||||
*/
|
||||
@Test
|
||||
public void testRegionShouldNotDeployed() throws Exception {
|
||||
String table = "tableRegionShouldNotDeployed";
|
||||
public void testRegionShouldNotBeDeployed() throws Exception {
|
||||
String table = "tableRegionShouldNotBeDeployed";
|
||||
try {
|
||||
LOG.info("Starting testRegionShouldNotDeployed.");
|
||||
LOG.info("Starting testRegionShouldNotBeDeployed.");
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
assertTrue(cluster.waitForActiveAndReadyMaster());
|
||||
|
||||
|
|
Loading…
Reference in New Issue