From b737e2e8bd6df5c95ddff1d8f59a6ab4aa0ff837 Mon Sep 17 00:00:00 2001 From: Jonathan Hsieh Date: Mon, 4 Jun 2012 09:41:14 +0000 Subject: [PATCH] HBASE-5892 [hbck] Refactor parallel WorkItem* to Futures (Andrew Wang) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1345890 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/util/HBaseFsck.java | 186 ++++++++---------- 1 file changed, 85 insertions(+), 101 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 653866e45a7..8221e142e1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -32,10 +32,11 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -157,7 +158,7 @@ public class HBaseFsck { private HConnection connection; private HBaseAdmin admin; private HTable meta; - private ThreadPoolExecutor executor; // threads to retrieve data from regionservers + private ScheduledThreadPoolExecutor executor; // threads to retrieve data from regionservers private long startMillis = System.currentTimeMillis(); /*********** @@ -223,10 +224,7 @@ public class HBaseFsck { this.conf = conf; int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS); - executor = new ThreadPoolExecutor(numThreads, numThreads, - THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, - new LinkedBlockingQueue()); - executor.allowCoreThreadTimeOut(true); + executor = new ScheduledThreadPoolExecutor(numThreads); } /** @@ -627,20 +625,25 @@ public class HBaseFsck { Collection hbckInfos = regionInfoMap.values(); // Parallelized read of .regioninfo files. - WorkItemHdfsRegionInfo[] hbis = new WorkItemHdfsRegionInfo[hbckInfos.size()]; - int num = 0; + List hbis = new ArrayList(hbckInfos.size()); + List> hbiFutures; + for (HbckInfo hbi : hbckInfos) { - hbis[num] = new WorkItemHdfsRegionInfo(hbi, this, errors); - executor.execute(hbis[num]); - num++; + WorkItemHdfsRegionInfo work = new WorkItemHdfsRegionInfo(hbi, this, errors); + hbis.add(work); } - for (int i=0; i < num; i++) { - WorkItemHdfsRegionInfo hbi = hbis[i]; - synchronized(hbi) { - while (!hbi.isDone()) { - hbi.wait(); - } + // Submit and wait for completion + hbiFutures = executor.invokeAll(hbis); + + for(int i=0; i f = hbiFutures.get(i); + try { + f.get(); + } catch(ExecutionException e) { + LOG.warn("Failed to read .regioninfo file for region " + + work.hbi.getRegionNameAsString(), e.getCause()); } } @@ -1052,22 +1055,22 @@ public class HBaseFsck { } // level 1: /* - WorkItemHdfsDir[] dirs = new WorkItemHdfsDir[tableDirs.size()]; - int num = 0; + List dirs = new ArrayList(tableDirs.size()); + List> dirsFutures; + for (FileStatus tableDir : tableDirs) { LOG.debug("Loading region dirs from " +tableDir.getPath()); - dirs[num] = new WorkItemHdfsDir(this, fs, errors, tableDir); - executor.execute(dirs[num]); - num++; + dirs.add(new WorkItemHdfsDir(this, fs, errors, tableDir)); } - // wait for all directories to be done - for (int i = 0; i < num; i++) { - WorkItemHdfsDir dir = dirs[i]; - synchronized (dir) { - while (!dir.isDone()) { - dir.wait(); - } + // Invoke and wait for Callables to complete + dirsFutures = executor.invokeAll(dirs); + + for(Future f: dirsFutures) { + try { + f.get(); + } catch(ExecutionException e) { + LOG.warn("Could not load region dir " , e.getCause()); } } } @@ -1137,22 +1140,24 @@ public class HBaseFsck { void processRegionServers(Collection regionServerList) throws IOException, InterruptedException { - WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()]; - int num = 0; + List workItems = new ArrayList(regionServerList.size()); + List> workFutures; // loop to contact each region server in parallel for (ServerName rsinfo: regionServerList) { - work[num] = new WorkItemRegion(this, rsinfo, errors, connection); - executor.execute(work[num]); - num++; + workItems.add(new WorkItemRegion(this, rsinfo, errors, connection)); } - // wait for all submitted tasks to be done - for (int i = 0; i < num; i++) { - synchronized (work[i]) { - while (!work[i].isDone()) { - work[i].wait(); - } + workFutures = executor.invokeAll(workItems); + + for(int i=0; i f = workFutures.get(i); + try { + f.get(); + } catch(ExecutionException e) { + LOG.warn("Could not process regionserver " + item.rsinfo.getHostAndPort(), + e.getCause()); } } } @@ -2367,10 +2372,11 @@ public class HBaseFsck { if (metaEntry != null) { return metaEntry.getRegionNameAsString(); } else if (hdfsEntry != null) { - return hdfsEntry.hri.getRegionNameAsString(); - } else { - return null; + if (hdfsEntry.hri != null) { + return hdfsEntry.hri.getRegionNameAsString(); + } } + return null; } public byte[] getRegionName() { @@ -2619,12 +2625,11 @@ public class HBaseFsck { /** * Contact a region server and get all information from it */ - static class WorkItemRegion implements Runnable { + static class WorkItemRegion implements Callable { private HBaseFsck hbck; private ServerName rsinfo; private ErrorReporter errors; private HConnection connection; - private boolean done; WorkItemRegion(HBaseFsck hbck, ServerName info, ErrorReporter errors, HConnection connection) { @@ -2632,16 +2637,10 @@ public class HBaseFsck { this.rsinfo = info; this.errors = errors; this.connection = connection; - this.done = false; - } - - // is this task done? - synchronized boolean isDone() { - return done; } @Override - public synchronized void run() { + public synchronized Void call() throws IOException { errors.progress(); try { AdminProtocol server = @@ -2672,10 +2671,9 @@ public class HBaseFsck { } catch (IOException e) { // unable to connect to the region server. errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "RegionServer: " + rsinfo.getServerName() + " Unable to fetch region information. " + e); - } finally { - done = true; - notifyAll(); // wakeup anybody waiting for this item to be done + throw e; } + return null; } private List filterOnlyMetaRegions(List regions) { @@ -2693,12 +2691,11 @@ public class HBaseFsck { * Contact hdfs and get all information about specified table directory into * regioninfo list. */ - static class WorkItemHdfsDir implements Runnable { + static class WorkItemHdfsDir implements Callable { private HBaseFsck hbck; private FileStatus tableDir; private ErrorReporter errors; private FileSystem fs; - private boolean done; WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors, FileStatus status) { @@ -2706,27 +2703,25 @@ public class HBaseFsck { this.fs = fs; this.tableDir = status; this.errors = errors; - this.done = false; } - synchronized boolean isDone() { - return done; - } - @Override - public synchronized void run() { + public synchronized Void call() throws IOException { try { String tableName = tableDir.getPath().getName(); // ignore hidden files if (tableName.startsWith(".") && - !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME))) - return; + !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME))) { + return null; + } // level 2: //* FileStatus[] regionDirs = fs.listStatus(tableDir.getPath()); for (FileStatus regionDir : regionDirs) { String encodedName = regionDir.getPath().getName(); // ignore directories that aren't hexadecimal - if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue; + if (!encodedName.toLowerCase().matches("[0-9a-f]+")) { + continue; + } LOG.debug("Loading region info from hdfs:"+ regionDir.getPath()); HbckInfo hbi = hbck.getOrCreateInfo(encodedName); @@ -2763,10 +2758,9 @@ public class HBaseFsck { errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: " + tableDir.getPath().getName() + " Unable to fetch region information. " + e); - } finally { - done = true; - notifyAll(); + throw e; } + return null; } } @@ -2774,51 +2768,41 @@ public class HBaseFsck { * Contact hdfs and get all information about specified table directory into * regioninfo list. */ - static class WorkItemHdfsRegionInfo implements Runnable { + static class WorkItemHdfsRegionInfo implements Callable { private HbckInfo hbi; private HBaseFsck hbck; private ErrorReporter errors; - private boolean done; WorkItemHdfsRegionInfo(HbckInfo hbi, HBaseFsck hbck, ErrorReporter errors) { this.hbi = hbi; this.hbck = hbck; this.errors = errors; - this.done = false; - } - - synchronized boolean isDone() { - return done; } @Override - public synchronized void run() { - try { - // only load entries that haven't been loaded yet. - if (hbi.getHdfsHRI() == null) { + public synchronized Void call() throws IOException { + // only load entries that haven't been loaded yet. + if (hbi.getHdfsHRI() == null) { + try { + hbck.loadHdfsRegioninfo(hbi); + } catch (IOException ioe) { + String msg = "Orphan region in HDFS: Unable to load .regioninfo from table " + + Bytes.toString(hbi.getTableName()) + " in hdfs dir " + + hbi.getHdfsRegionDir() + + "! It may be an invalid format or version file. Treating as " + + "an orphaned regiondir."; + errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg); try { - hbck.loadHdfsRegioninfo(hbi); - } catch (IOException ioe) { - String msg = "Orphan region in HDFS: Unable to load .regioninfo from table " - + Bytes.toString(hbi.getTableName()) + " in hdfs dir " - + hbi.getHdfsRegionDir() - + "! It may be an invalid format or version file. Treating as " - + "an orphaned regiondir."; - errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg); - try { - hbck.debugLsr(hbi.getHdfsRegionDir()); - } catch (IOException ioe2) { - LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2); - return; // TODO convert this in to a future - } - hbck.orphanHdfsDirs.add(hbi); - return; + hbck.debugLsr(hbi.getHdfsRegionDir()); + } catch (IOException ioe2) { + LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2); + throw ioe2; } + hbck.orphanHdfsDirs.add(hbi); + throw ioe; } - } finally { - done = true; - notifyAll(); } + return null; } };