HBASE-5892 [hbck] Refactor parallel WorkItem* to Futures (Andrew Wang)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1345890 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2012-06-04 09:41:14 +00:00
parent 57a542e685
commit b737e2e8bd
1 changed files with 85 additions and 101 deletions

View File

@ -32,10 +32,11 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -157,7 +158,7 @@ public class HBaseFsck {
private HConnection connection;
private HBaseAdmin admin;
private HTable meta;
private ThreadPoolExecutor executor; // threads to retrieve data from regionservers
private ScheduledThreadPoolExecutor executor; // threads to retrieve data from regionservers
private long startMillis = System.currentTimeMillis();
/***********
@ -223,10 +224,7 @@ public class HBaseFsck {
this.conf = conf;
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
executor = new ThreadPoolExecutor(numThreads, numThreads,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);
executor = new ScheduledThreadPoolExecutor(numThreads);
}
/**
@ -627,20 +625,25 @@ public class HBaseFsck {
Collection<HbckInfo> hbckInfos = regionInfoMap.values();
// Parallelized read of .regioninfo files.
WorkItemHdfsRegionInfo[] hbis = new WorkItemHdfsRegionInfo[hbckInfos.size()];
int num = 0;
List<WorkItemHdfsRegionInfo> hbis = new ArrayList<WorkItemHdfsRegionInfo>(hbckInfos.size());
List<Future<Void>> hbiFutures;
for (HbckInfo hbi : hbckInfos) {
hbis[num] = new WorkItemHdfsRegionInfo(hbi, this, errors);
executor.execute(hbis[num]);
num++;
WorkItemHdfsRegionInfo work = new WorkItemHdfsRegionInfo(hbi, this, errors);
hbis.add(work);
}
for (int i=0; i < num; i++) {
WorkItemHdfsRegionInfo hbi = hbis[i];
synchronized(hbi) {
while (!hbi.isDone()) {
hbi.wait();
}
// Submit and wait for completion
hbiFutures = executor.invokeAll(hbis);
for(int i=0; i<hbiFutures.size(); i++) {
WorkItemHdfsRegionInfo work = hbis.get(i);
Future<Void> f = hbiFutures.get(i);
try {
f.get();
} catch(ExecutionException e) {
LOG.warn("Failed to read .regioninfo file for region " +
work.hbi.getRegionNameAsString(), e.getCause());
}
}
@ -1052,22 +1055,22 @@ public class HBaseFsck {
}
// level 1: <HBASE_DIR>/*
WorkItemHdfsDir[] dirs = new WorkItemHdfsDir[tableDirs.size()];
int num = 0;
List<WorkItemHdfsDir> dirs = new ArrayList<WorkItemHdfsDir>(tableDirs.size());
List<Future<Void>> dirsFutures;
for (FileStatus tableDir : tableDirs) {
LOG.debug("Loading region dirs from " +tableDir.getPath());
dirs[num] = new WorkItemHdfsDir(this, fs, errors, tableDir);
executor.execute(dirs[num]);
num++;
dirs.add(new WorkItemHdfsDir(this, fs, errors, tableDir));
}
// wait for all directories to be done
for (int i = 0; i < num; i++) {
WorkItemHdfsDir dir = dirs[i];
synchronized (dir) {
while (!dir.isDone()) {
dir.wait();
}
// Invoke and wait for Callables to complete
dirsFutures = executor.invokeAll(dirs);
for(Future<Void> f: dirsFutures) {
try {
f.get();
} catch(ExecutionException e) {
LOG.warn("Could not load region dir " , e.getCause());
}
}
}
@ -1137,22 +1140,24 @@ public class HBaseFsck {
void processRegionServers(Collection<ServerName> regionServerList)
throws IOException, InterruptedException {
WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()];
int num = 0;
List<WorkItemRegion> workItems = new ArrayList<WorkItemRegion>(regionServerList.size());
List<Future<Void>> workFutures;
// loop to contact each region server in parallel
for (ServerName rsinfo: regionServerList) {
work[num] = new WorkItemRegion(this, rsinfo, errors, connection);
executor.execute(work[num]);
num++;
workItems.add(new WorkItemRegion(this, rsinfo, errors, connection));
}
// wait for all submitted tasks to be done
for (int i = 0; i < num; i++) {
synchronized (work[i]) {
while (!work[i].isDone()) {
work[i].wait();
}
workFutures = executor.invokeAll(workItems);
for(int i=0; i<workFutures.size(); i++) {
WorkItemRegion item = workItems.get(i);
Future<Void> f = workFutures.get(i);
try {
f.get();
} catch(ExecutionException e) {
LOG.warn("Could not process regionserver " + item.rsinfo.getHostAndPort(),
e.getCause());
}
}
}
@ -2367,10 +2372,11 @@ public class HBaseFsck {
if (metaEntry != null) {
return metaEntry.getRegionNameAsString();
} else if (hdfsEntry != null) {
return hdfsEntry.hri.getRegionNameAsString();
} else {
return null;
if (hdfsEntry.hri != null) {
return hdfsEntry.hri.getRegionNameAsString();
}
}
return null;
}
public byte[] getRegionName() {
@ -2619,12 +2625,11 @@ public class HBaseFsck {
/**
* Contact a region server and get all information from it
*/
static class WorkItemRegion implements Runnable {
static class WorkItemRegion implements Callable<Void> {
private HBaseFsck hbck;
private ServerName rsinfo;
private ErrorReporter errors;
private HConnection connection;
private boolean done;
WorkItemRegion(HBaseFsck hbck, ServerName info,
ErrorReporter errors, HConnection connection) {
@ -2632,16 +2637,10 @@ public class HBaseFsck {
this.rsinfo = info;
this.errors = errors;
this.connection = connection;
this.done = false;
}
// is this task done?
synchronized boolean isDone() {
return done;
}
@Override
public synchronized void run() {
public synchronized Void call() throws IOException {
errors.progress();
try {
AdminProtocol server =
@ -2672,10 +2671,9 @@ public class HBaseFsck {
} catch (IOException e) { // unable to connect to the region server.
errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "RegionServer: " + rsinfo.getServerName() +
" Unable to fetch region information. " + e);
} finally {
done = true;
notifyAll(); // wakeup anybody waiting for this item to be done
throw e;
}
return null;
}
private List<HRegionInfo> filterOnlyMetaRegions(List<HRegionInfo> regions) {
@ -2693,12 +2691,11 @@ public class HBaseFsck {
* Contact hdfs and get all information about specified table directory into
* regioninfo list.
*/
static class WorkItemHdfsDir implements Runnable {
static class WorkItemHdfsDir implements Callable<Void> {
private HBaseFsck hbck;
private FileStatus tableDir;
private ErrorReporter errors;
private FileSystem fs;
private boolean done;
WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors,
FileStatus status) {
@ -2706,27 +2703,25 @@ public class HBaseFsck {
this.fs = fs;
this.tableDir = status;
this.errors = errors;
this.done = false;
}
synchronized boolean isDone() {
return done;
}
@Override
public synchronized void run() {
public synchronized Void call() throws IOException {
try {
String tableName = tableDir.getPath().getName();
// ignore hidden files
if (tableName.startsWith(".") &&
!tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
return;
!tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME))) {
return null;
}
// level 2: <HBASE_DIR>/<table>/*
FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
for (FileStatus regionDir : regionDirs) {
String encodedName = regionDir.getPath().getName();
// ignore directories that aren't hexadecimal
if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
if (!encodedName.toLowerCase().matches("[0-9a-f]+")) {
continue;
}
LOG.debug("Loading region info from hdfs:"+ regionDir.getPath());
HbckInfo hbi = hbck.getOrCreateInfo(encodedName);
@ -2763,10 +2758,9 @@ public class HBaseFsck {
errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: "
+ tableDir.getPath().getName()
+ " Unable to fetch region information. " + e);
} finally {
done = true;
notifyAll();
throw e;
}
return null;
}
}
@ -2774,51 +2768,41 @@ public class HBaseFsck {
* Contact hdfs and get all information about specified table directory into
* regioninfo list.
*/
static class WorkItemHdfsRegionInfo implements Runnable {
static class WorkItemHdfsRegionInfo implements Callable<Void> {
private HbckInfo hbi;
private HBaseFsck hbck;
private ErrorReporter errors;
private boolean done;
WorkItemHdfsRegionInfo(HbckInfo hbi, HBaseFsck hbck, ErrorReporter errors) {
this.hbi = hbi;
this.hbck = hbck;
this.errors = errors;
this.done = false;
}
synchronized boolean isDone() {
return done;
}
@Override
public synchronized void run() {
try {
// only load entries that haven't been loaded yet.
if (hbi.getHdfsHRI() == null) {
public synchronized Void call() throws IOException {
// only load entries that haven't been loaded yet.
if (hbi.getHdfsHRI() == null) {
try {
hbck.loadHdfsRegioninfo(hbi);
} catch (IOException ioe) {
String msg = "Orphan region in HDFS: Unable to load .regioninfo from table "
+ Bytes.toString(hbi.getTableName()) + " in hdfs dir "
+ hbi.getHdfsRegionDir()
+ "! It may be an invalid format or version file. Treating as "
+ "an orphaned regiondir.";
errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg);
try {
hbck.loadHdfsRegioninfo(hbi);
} catch (IOException ioe) {
String msg = "Orphan region in HDFS: Unable to load .regioninfo from table "
+ Bytes.toString(hbi.getTableName()) + " in hdfs dir "
+ hbi.getHdfsRegionDir()
+ "! It may be an invalid format or version file. Treating as "
+ "an orphaned regiondir.";
errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg);
try {
hbck.debugLsr(hbi.getHdfsRegionDir());
} catch (IOException ioe2) {
LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2);
return; // TODO convert this in to a future
}
hbck.orphanHdfsDirs.add(hbi);
return;
hbck.debugLsr(hbi.getHdfsRegionDir());
} catch (IOException ioe2) {
LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2);
throw ioe2;
}
hbck.orphanHdfsDirs.add(hbi);
throw ioe;
}
} finally {
done = true;
notifyAll();
}
return null;
}
};