HBASE-3721 Speedup LoadIncrementalHFiles

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1100045 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-05-06 04:26:58 +00:00
parent 3450c962ad
commit f3ebb53efc
2 changed files with 128 additions and 58 deletions

View File

@ -206,6 +206,7 @@ Release 0.91.0 - Unreleased
HBASE-3670 Fix error handling in get(List<Get> gets)
(Harsh J Chouraria)
HBASE-3835 Switch master and region server pages to Jamon-based templates
HBASE-3721 Speedup LoadIncrementalHFiles (Ted Yu)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -21,10 +21,22 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.ArrayList;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,10 +46,13 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerCallable;
@ -50,12 +65,11 @@ import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import java.util.TreeMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@ -67,23 +81,19 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
private static final int TABLE_CREATE_MAX_RETRIES = 20;
private static final long TABLE_CREATE_SLEEP = 60000;
static AtomicLong regionCount = new AtomicLong(0);
private HBaseAdmin hbAdmin;
private Configuration cfg;
private Set<Future> futures = new HashSet<Future>();
public static String NAME = "completebulkload";
public LoadIncrementalHFiles(Configuration conf) throws Exception {
super(conf);
this.cfg = conf;
this.hbAdmin = new HBaseAdmin(conf);
}
/* This constructor does not add HBase configuration.
* Explicit addition is necessary. Do we need this constructor?
*/
public LoadIncrementalHFiles() {
super();
}
private void usage() {
System.err.println("usage: " + NAME +
" /path/to/hfileoutputformat-output " +
@ -165,13 +175,38 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
Deque<LoadQueueItem> queue = null;
int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
Runtime.getRuntime().availableProcessors());
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("LoadIncrementalHFiles-%1$d");
ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
builder.build());
((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
try {
queue = discoverLoadQueue(hfofDir);
// outer loop picks up LoadQueueItem due to HFile split
while (!queue.isEmpty()) {
Pair<byte[][],byte[][]> startEndKeys = table.getStartEndKeys();
// inner loop groups callables
while (!queue.isEmpty()) {
LoadQueueItem item = queue.remove();
tryLoad(item, conn, table.getTableName(), queue);
tryLoad(item, conn, table, queue, startEndKeys, pool);
}
}
for (Future<Void> future : futures) {
try {
future.get();
} catch (ExecutionException ee) {
LOG.error(ee);
} catch (InterruptedException ie) {
LOG.error(ie);
}
}
} finally {
pool.shutdown();
if (queue != null && !queue.isEmpty()) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
@ -185,15 +220,48 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
}
// unique file name for the table
String getUniqueName(byte[] tableName) {
String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet();
return name;
}
void splitStoreFileAndRequeue(final LoadQueueItem item,
final Deque<LoadQueueItem> queue, final HTable table,
byte[] startKey, byte[] splitKey) throws IOException {
final Path hfilePath = item.hfilePath;
// We use a '_' prefix which is ignored when walking directory trees
// above.
final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
"region. Splitting...");
String uniqueName = getUniqueName(table.getTableName());
HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
Path botOut = new Path(tmpDir, uniqueName + ".bottom");
Path topOut = new Path(tmpDir, uniqueName + ".top");
splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
botOut, topOut);
// Add these back at the *front* of the queue, so there's a lower
// chance that the region will just split again before we get there.
queue.addFirst(new LoadQueueItem(item.family, botOut));
queue.addFirst(new LoadQueueItem(item.family, topOut));
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
}
/**
* Attempt to load the given load queue item into its target region server.
* If the hfile boundary no longer fits into a region, physically splits
* the hfile such that the new bottom half will fit, and adds the two
* resultant hfiles back into the load queue.
*/
private void tryLoad(final LoadQueueItem item,
HConnection conn, final byte[] table,
final Deque<LoadQueueItem> queue)
private boolean tryLoad(final LoadQueueItem item,
final HConnection conn, final HTable table,
final Deque<LoadQueueItem> queue, Pair<byte[][],byte[][]> startEndKeys,
ExecutorService pool)
throws IOException {
final Path hfilePath = item.hfilePath;
final FileSystem fs = hfilePath.getFileSystem(getConf());
@ -213,43 +281,44 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
if (first == null || last == null) {
assert first == null && last == null;
LOG.info("hfile " + hfilePath + " has no entries, skipping");
return;
return false;
}
if (Bytes.compareTo(first, last) > 0) {
throw new IllegalArgumentException(
"Invalid range: " + Bytes.toStringBinary(first) +
" > " + Bytes.toStringBinary(last));
}
int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR);
if (idx < 0) {
idx = -(idx+1)-1;
}
boolean lastKeyInRange =
Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
if (!lastKeyInRange) {
splitStoreFileAndRequeue(item, queue, table,
startEndKeys.getFirst()[idx], startEndKeys.getSecond()[idx]);
return true;
}
// We use a '_' prefix which is ignored when walking directory trees
// above.
final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
conn.getRegionServerWithRetries(
new ServerCallable<Void>(conn, table, first) {
final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn, table.getTableName(), first) {
@Override
public Void call() throws Exception {
LOG.debug("Going to connect to server " + location +
"for row " + Bytes.toStringBinary(row));
HRegionInfo hri = location.getRegionInfo();
if (!hri.containsRange(first, last)) {
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
"region. Splitting...");
HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
botOut, topOut);
// Add these back at the *front* of the queue, so there's a lower
// chance that the region will just split again before we get there.
queue.addFirst(new LoadQueueItem(item.family, botOut));
queue.addFirst(new LoadQueueItem(item.family, topOut));
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
return null;
}
byte[] regionName = location.getRegionInfo().getRegionName();
server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
return null;
}
});
};
Callable<Void> callable = new Callable<Void>() {
public Void call() throws Exception {
return conn.getRegionServerWithRetries(svrCallable);
}
};
futures.add(pool.submit(callable));
return false;
}
/**
@ -419,7 +488,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!");
}
HTable table = new HTable(tableName);
HTable table = new HTable(this.cfg, tableName);
HConnection conn = table.getConnection();
int ctr = 0;
@ -446,7 +515,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
if (!tableExists) this.createTable(tableName,dirPath);
Path hfofDir = new Path(dirPath);
HTable table = new HTable(tableName);
HTable table = new HTable(this.cfg, tableName);
doBulkLoad(hfofDir, table);
return 0;