HBASE-3871 Speedup LoadIncrementalHFiles by parallelizing HFile splitting

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1145459 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-07-12 05:15:58 +00:00
parent 0a99255ae1
commit d775dc50c1
2 changed files with 39 additions and 6 deletions

View File

@ -313,6 +313,7 @@ Release 0.91.0 - Unreleased
(Aaron T. Myers via todd)
HBASE-4054 Usability improvement to HTablePool (Daniel Iancu)
HBASE-4079 HTableUtil - helper class for loading data (Doug Meil via Ted Yu)
HBASE-3871 Speedup LoadIncrementalHFiles by parallelizing HFile splitting
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
@ -36,6 +37,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@ -85,6 +87,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private HBaseAdmin hbAdmin;
private Configuration cfg;
private Set<Future> futures = new HashSet<Future>();
private Set<Future> futuresForSplittingHFile = new HashSet<Future>();
public static String NAME = "completebulkload";
@ -188,13 +191,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try {
queue = discoverLoadQueue(hfofDir);
// outer loop picks up LoadQueueItem due to HFile split
while (!queue.isEmpty()) {
while (!queue.isEmpty() || futuresForSplittingHFile.size() > 0) {
Pair<byte[][],byte[][]> startEndKeys = table.getStartEndKeys();
// inner loop groups callables
while (!queue.isEmpty()) {
LoadQueueItem item = queue.remove();
tryLoad(item, conn, table, queue, startEndKeys, pool);
}
Iterator<Future> iter = futuresForSplittingHFile.iterator();
while (iter.hasNext()) {
boolean timeoutSeen = false;
Future future = iter.next();
try {
future.get(20, TimeUnit.MILLISECONDS);
break; // we have at least two new HFiles to process
} catch (ExecutionException ee) {
LOG.error(ee);
} catch (InterruptedException ie) {
LOG.error(ie);
} catch (TimeoutException te) {
timeoutSeen = true;
} finally {
if (!timeoutSeen) iter.remove();
}
}
}
for (Future<Void> future : futures) {
try {
@ -247,8 +267,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// Add these back at the *front* of the queue, so there's a lower
// chance that the region will just split again before we get there.
queue.addFirst(new LoadQueueItem(item.family, botOut));
queue.addFirst(new LoadQueueItem(item.family, topOut));
synchronized (queue) {
queue.addFirst(new LoadQueueItem(item.family, botOut));
queue.addFirst(new LoadQueueItem(item.family, topOut));
}
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
}
@ -260,7 +282,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
private boolean tryLoad(final LoadQueueItem item,
final HConnection conn, final HTable table,
final Deque<LoadQueueItem> queue, Pair<byte[][],byte[][]> startEndKeys,
final Deque<LoadQueueItem> queue,
final Pair<byte[][],byte[][]> startEndKeys,
ExecutorService pool)
throws IOException {
final Path hfilePath = item.hfilePath;
@ -292,12 +315,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
if (idx < 0) {
idx = -(idx+1)-1;
}
final int indexForCallable = idx;
boolean lastKeyInRange =
Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
if (!lastKeyInRange) {
splitStoreFileAndRequeue(item, queue, table,
startEndKeys.getFirst()[idx], startEndKeys.getSecond()[idx]);
Callable<Void> callable = new Callable<Void>() {
public Void call() throws Exception {
splitStoreFileAndRequeue(item, queue, table,
startEndKeys.getFirst()[indexForCallable],
startEndKeys.getSecond()[indexForCallable]);
return (Void)null;
}
};
futuresForSplittingHFile.add(pool.submit(callable));
return true;
}