HBASE-4552 multi-CF bulk load is not atomic across column families (Jonathan Hsieh)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1195574 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-10-31 17:26:42 +00:00
parent fb97e1381a
commit e32b161c8f
10 changed files with 1004 additions and 112 deletions

View File

@ -430,6 +430,7 @@ Release 0.92.0 - Unreleased
to move a region to move a region
HBASE-4613 hbase.util.Threads#threadDumpingIsAlive sleeps 1 second, HBASE-4613 hbase.util.Threads#threadDumpingIsAlive sleeps 1 second,
slowing down the shutdown by 0.5s slowing down the shutdown by 0.5s
HBASE-4552 multi-CF bulk load is not atomic across column families (Jonathan Hsieh)
TESTS TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected HBASE-4450 test for number of blocks read: to serve as baseline for expected

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.ipc.VersionedProtocol;
@ -62,7 +63,7 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
// maintained a single global version number on all HBase Interfaces. This // maintained a single global version number on all HBase Interfaces. This
// meant all HBase RPC was broke though only one of the three RPC Interfaces // meant all HBase RPC was broke though only one of the three RPC Interfaces
// had changed. This has since been undone. // had changed. This has since been undone.
public static final long VERSION = 28L; public static final long VERSION = 29L;
/** /**
* Get metainfo about an HRegion * Get metainfo about an HRegion
@ -323,9 +324,13 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
public <R> MultiResponse multi(MultiAction<R> multi) throws IOException; public <R> MultiResponse multi(MultiAction<R> multi) throws IOException;
/** /**
* Bulk load an HFile into an open region * Atomically bulk load multiple HFiles (say from different column families)
* into an open region.
*
* @param familyPaths List of (family, hfile path) pairs
* @param regionName name of region to load hfiles into
*/ */
public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName) public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, byte[] regionName)
throws IOException; throws IOException;
// Master methods // Master methods

View File

@ -21,13 +21,16 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Deque; import java.util.Deque;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -37,7 +40,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -62,9 +64,9 @@ import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -72,9 +74,11 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Tool to load the output of HFileOutputFormat into an existing table. * Tool to load the output of HFileOutputFormat into an existing table.
* @see #usage() * @see #usage()
@ -87,8 +91,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
static AtomicLong regionCount = new AtomicLong(0); static AtomicLong regionCount = new AtomicLong(0);
private HBaseAdmin hbAdmin; private HBaseAdmin hbAdmin;
private Configuration cfg; private Configuration cfg;
private Set<Future> futures = new HashSet<Future>();
private Set<Future> futuresForSplittingHFile = new HashSet<Future>();
public static String NAME = "completebulkload"; public static String NAME = "completebulkload";
@ -112,7 +114,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* region boundary, and each part is added back into the queue. * region boundary, and each part is added back into the queue.
* The import process finishes when the queue is empty. * The import process finishes when the queue is empty.
*/ */
private static class LoadQueueItem { static class LoadQueueItem {
final byte[] family; final byte[] family;
final Path hfilePath; final Path hfilePath;
@ -120,13 +122,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
this.family = family; this.family = family;
this.hfilePath = hfilePath; this.hfilePath = hfilePath;
} }
public String toString() {
return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
}
} }
/** /**
* Walk the given directory for all HFiles, and return a Queue * Walk the given directory for all HFiles, and return a Queue
* containing all such files. * containing all such files.
*/ */
private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir) private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
throws IOException { throws IOException {
FileSystem fs = hfofDir.getFileSystem(getConf()); FileSystem fs = hfofDir.getFileSystem(getConf());
@ -140,7 +146,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw new FileNotFoundException("No families found in " + hfofDir); throw new FileNotFoundException("No families found in " + hfofDir);
} }
Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
for (FileStatus stat : familyDirStatuses) { for (FileStatus stat : familyDirStatuses) {
if (!stat.isDir()) { if (!stat.isDir()) {
LOG.warn("Skipping non-directory " + stat.getPath()); LOG.warn("Skipping non-directory " + stat.getPath());
@ -156,7 +161,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
ret.add(new LoadQueueItem(family, hfile)); ret.add(new LoadQueueItem(family, hfile));
} }
} }
return ret;
} }
/** /**
@ -167,10 +171,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param table the table to load into * @param table the table to load into
* @throws TableNotFoundException if table does not yet exist * @throws TableNotFoundException if table does not yet exist
*/ */
public void doBulkLoad(Path hfofDir, HTable table) public void doBulkLoad(Path hfofDir, final HTable table)
throws TableNotFoundException, IOException throws TableNotFoundException, IOException
{ {
HConnection conn = table.getConnection(); final HConnection conn = table.getConnection();
if (!conn.isTableAvailable(table.getTableName())) { if (!conn.isTableAvailable(table.getTableName())) {
throw new TableNotFoundException("Table " + throw new TableNotFoundException("Table " +
@ -178,54 +182,51 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
"is not currently available."); "is not currently available.");
} }
Deque<LoadQueueItem> queue = null; // initialize thread pools
int nrThreads = cfg.getInt("hbase.loadincremental.threads.max", int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
Runtime.getRuntime().availableProcessors()); Runtime.getRuntime().availableProcessors());
ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("LoadIncrementalHFiles-%1$d"); builder.setNameFormat("LoadIncrementalHFiles-%1$d");
ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
60, TimeUnit.SECONDS, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new LinkedBlockingQueue<Runnable>(),
builder.build()); builder.build());
((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
try { try {
queue = discoverLoadQueue(hfofDir); discoverLoadQueue(queue, hfofDir);
// outer loop picks up LoadQueueItem due to HFile split int count = 0;
while (!queue.isEmpty() || futuresForSplittingHFile.size() > 0) {
Pair<byte[][],byte[][]> startEndKeys = table.getStartEndKeys(); // Assumes that region splits can happen while this occurs.
// inner loop groups callables while (!queue.isEmpty()) {
while (!queue.isEmpty()) { // need to reload split keys each iteration.
LoadQueueItem item = queue.remove(); final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
tryLoad(item, conn, table, queue, startEndKeys, pool); if (count != 0) {
LOG.info("Split occured while grouping HFiles, retry attempt " +
+ count + " with " + queue.size() + " files remaining to load");
} }
Iterator<Future> iter = futuresForSplittingHFile.iterator();
while (iter.hasNext()) { int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 10);
boolean timeoutSeen = false; if (count >= maxRetries) {
Future future = iter.next(); LOG.error("Retry attempted " + count + " times without completing, bailing out");
try { return;
future.get(20, TimeUnit.MILLISECONDS);
break; // we have at least two new HFiles to process
} catch (ExecutionException ee) {
LOG.error(ee);
} catch (InterruptedException ie) {
LOG.error(ie);
} catch (TimeoutException te) {
timeoutSeen = true;
} finally {
if (!timeoutSeen) iter.remove();
}
}
}
for (Future<Void> future : futures) {
try {
future.get();
} catch (ExecutionException ee) {
LOG.error(ee);
} catch (InterruptedException ie) {
LOG.error(ie);
} }
count++;
// Using ByteBuffer for byte[] equality semantics
Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
pool, queue, startEndKeys);
bulkLoadPhase(table, conn, pool, queue, regionGroups);
// NOTE: The next iteration's split / group could happen in parallel to
// atomic bulkloads assuming that there are splits and no merges, and
// that we can atomically pull out the groups we want to retry.
} }
} finally { } finally {
pool.shutdown(); pool.shutdown();
if (queue != null && !queue.isEmpty()) { if (queue != null && !queue.isEmpty()) {
@ -241,15 +242,111 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} }
} }
/**
* This takes the LQI's grouped by likely regions and attempts to bulk load
* them. Any failures are re-queued for another pass with the
* groupOrSplitPhase.
*/
private void bulkLoadPhase(final HTable table, final HConnection conn,
ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
// atomically bulk load the groups.
Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
final byte[] first = e.getKey().array();
final Collection<LoadQueueItem> lqis = e.getValue();
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> toRetry = tryAtomicRegionLoad(conn, table.getTableName(), first, lqis);
return toRetry;
}
};
loadingFutures.add(pool.submit(call));
}
// get all the results.
for (Future<List<LoadQueueItem>> future : loadingFutures) {
try {
List<LoadQueueItem> toRetry = future.get();
if (toRetry != null && toRetry.size() != 0) {
// LQIs that are requeued to be regrouped.
queue.addAll(toRetry);
}
} catch (ExecutionException e1) {
Throwable t = e1.getCause();
if (t instanceof IOException) {
LOG.error("IOException during bulk load", e1);
throw (IOException)t; // would have been thrown if not parallelized,
}
LOG.error("Unexpected execution exception during bulk load", e1);
throw new IllegalStateException(t);
} catch (InterruptedException e1) {
LOG.error("Unexpected interrupted exception during bulk load", e1);
throw new IllegalStateException(e1);
}
}
}
/**
* @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
* bulk load region targets.
*/
private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table,
ExecutorService pool, Deque<LoadQueueItem> queue,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
// <region start key, LQI> need synchronized only within this scope of this
// phase because of the puts that happen in futures.
Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
// drain LQIs and figure out bulk load groups
Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
while (!queue.isEmpty()) {
final LoadQueueItem item = queue.remove();
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
return splits;
}
};
splittingFutures.add(pool.submit(call));
}
// get all the results. All grouping and splitting must finish before
// we can attempt the atomic loads.
for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
try {
List<LoadQueueItem> splits = lqis.get();
if (splits != null) {
queue.addAll(splits);
}
} catch (ExecutionException e1) {
Throwable t = e1.getCause();
if (t instanceof IOException) {
LOG.error("IOException during splitting", e1);
throw (IOException)t; // would have been thrown if not parallelized,
}
LOG.error("Unexpected execution exception during splitting", e1);
throw new IllegalStateException(t);
} catch (InterruptedException e1) {
LOG.error("Unexpected interrupted exception during splitting", e1);
throw new IllegalStateException(e1);
}
}
return regionGroups;
}
// unique file name for the table // unique file name for the table
String getUniqueName(byte[] tableName) { String getUniqueName(byte[] tableName) {
String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet(); String name = Bytes.toStringBinary(tableName) + "," + regionCount.incrementAndGet();
return name; return name;
} }
void splitStoreFileAndRequeue(final LoadQueueItem item, protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
final Deque<LoadQueueItem> queue, final HTable table, final HTable table, byte[] startKey,
byte[] startKey, byte[] splitKey) throws IOException { byte[] splitKey) throws IOException {
final Path hfilePath = item.hfilePath; final Path hfilePath = item.hfilePath;
// We use a '_' prefix which is ignored when walking directory trees // We use a '_' prefix which is ignored when walking directory trees
@ -268,25 +365,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// Add these back at the *front* of the queue, so there's a lower // Add these back at the *front* of the queue, so there's a lower
// chance that the region will just split again before we get there. // chance that the region will just split again before we get there.
synchronized (queue) { List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
queue.addFirst(new LoadQueueItem(item.family, botOut)); lqis.add(new LoadQueueItem(item.family, botOut));
queue.addFirst(new LoadQueueItem(item.family, topOut)); lqis.add(new LoadQueueItem(item.family, topOut));
}
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
return lqis;
} }
/** /**
* Attempt to load the given load queue item into its target region server. * Attempt to assign the given load queue item into its target region group.
* If the hfile boundary no longer fits into a region, physically splits * If the hfile boundary no longer fits into a region, physically splits
* the hfile such that the new bottom half will fit, and adds the two * the hfile such that the new bottom half will fit and returns the list of
* resultant hfiles back into the load queue. * LQI's corresponding to the resultant hfiles.
*
* protected for testing
*/ */
private boolean tryLoad(final LoadQueueItem item, protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final HConnection conn, final HTable table, final LoadQueueItem item, final HTable table,
final Deque<LoadQueueItem> queue, final Pair<byte[][], byte[][]> startEndKeys)
final Pair<byte[][],byte[][]> startEndKeys, throws IOException {
ExecutorService pool)
throws IOException {
final Path hfilePath = item.hfilePath; final Path hfilePath = item.hfilePath;
final FileSystem fs = hfilePath.getFileSystem(getConf()); final FileSystem fs = hfilePath.getFileSystem(getConf());
HFile.Reader hfr = HFile.createReader(fs, hfilePath, HFile.Reader hfr = HFile.createReader(fs, hfilePath,
@ -305,54 +403,80 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
" last=" + Bytes.toStringBinary(last)); " last=" + Bytes.toStringBinary(last));
if (first == null || last == null) { if (first == null || last == null) {
assert first == null && last == null; assert first == null && last == null;
// TODO what if this is due to a bad HFile?
LOG.info("hfile " + hfilePath + " has no entries, skipping"); LOG.info("hfile " + hfilePath + " has no entries, skipping");
return false; return null;
} }
if (Bytes.compareTo(first, last) > 0) { if (Bytes.compareTo(first, last) > 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Invalid range: " + Bytes.toStringBinary(first) + "Invalid range: " + Bytes.toStringBinary(first) +
" > " + Bytes.toStringBinary(last)); " > " + Bytes.toStringBinary(last));
} }
int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR); int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
Bytes.BYTES_COMPARATOR);
if (idx < 0) { if (idx < 0) {
idx = -(idx+1)-1; // not on boundary, returns -(insertion index). Calculate region it
// would be in.
idx = -(idx + 1) - 1;
} }
final int indexForCallable = idx; final int indexForCallable = idx;
boolean lastKeyInRange = boolean lastKeyInRange =
Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
if (!lastKeyInRange) { if (!lastKeyInRange) {
Callable<Void> callable = new Callable<Void>() { List<LoadQueueItem> lqis = splitStoreFile(item, table,
public Void call() throws Exception { startEndKeys.getFirst()[indexForCallable],
splitStoreFileAndRequeue(item, queue, table, startEndKeys.getSecond()[indexForCallable]);
startEndKeys.getFirst()[indexForCallable], return lqis;
startEndKeys.getSecond()[indexForCallable]);
return (Void)null;
}
};
futuresForSplittingHFile.add(pool.submit(callable));
return true;
} }
final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn, table.getTableName(), first) { // group regions.
regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
return null;
}
/**
* Attempts to do an atomic load of many hfiles into a region. If it fails,
* it returns a list of hfiles that need to be retried. If it is successful
* it will return an empty list.
*
* NOTE: To maintain row atomicity guarantees, region server callable should
* succeed atomically and fails atomically.
*
* Protected for testing.
*/
protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
final List<Pair<byte[], String>> famPaths =
new ArrayList<Pair<byte[], String>>(lqis.size());
for (LoadQueueItem lqi : lqis) {
famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
}
final ServerCallable<Void> svrCallable = new ServerCallable<Void>(conn,
tableName, first) {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
LOG.debug("Going to connect to server " + location + LOG.debug("Going to connect to server " + location + " for row "
"for row " + Bytes.toStringBinary(row)); + Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName(); byte[] regionName = location.getRegionInfo().getRegionName();
server.bulkLoadHFile(hfilePath.toString(), regionName, item.family); server.bulkLoadHFiles(famPaths, regionName);
return null; return null;
} }
}; };
Callable<Void> callable = new Callable<Void>() {
public Void call() throws Exception { List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
return conn.getRegionServerWithRetries(svrCallable); try {
} conn.getRegionServerWithRetries(svrCallable);
}; } catch (IOException e) {
futures.add(pool.submit(callable)); LOG.warn("Attempt to bulk load region containing "
return false; + Bytes.toStringBinary(first) + " into table "
+ Bytes.toStringBinary(tableName) + " with files " + lqis
+ " failed");
toRetry.addAll(lqis);
}
return toRetry;
} }
/** /**
@ -559,7 +683,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args); int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
System.exit(ret);
} }
} }

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter; import org.cliffc.high_scale_lib.Counter;
@ -2781,24 +2782,90 @@ public class HRegion implements HeapSize { // , Writable{
return lid; return lid;
} }
public void bulkLoadHFile(String hfilePath, byte[] familyName) /**
* Attempts to atomically load a group of hfiles. This is critical for loading
* rows with multiple column families atomically.
*
* @param familyPaths List of Pair<byte[] column family, String hfilePath>
*/
public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths)
throws IOException { throws IOException {
startRegionOperation(); startBulkRegionOperation();
this.writeRequestsCount.increment(); this.writeRequestsCount.increment();
List<IOException> ioes = new ArrayList<IOException>();
List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
boolean rangesOk = true;
try { try {
Store store = getStore(familyName); // There possibly was a split that happend between when the split keys
if (store == null) { // were gathered and before the HReiogn's write lock was taken. We need
throw new DoNotRetryIOException( // to validate the HFile region before attempting to bulk load all of them
"No such column family " + Bytes.toStringBinary(familyName)); for (Pair<byte[], String> p : familyPaths) {
byte[] familyName = p.getFirst();
String path = p.getSecond();
Store store = getStore(familyName);
if (store == null) {
IOException ioe = new DoNotRetryIOException(
"No such column family " + Bytes.toStringBinary(familyName));
ioes.add(ioe);
failures.add(p);
}
try {
store.assertBulkLoadHFileOk(new Path(path));
} catch (IOException ioe) {
rangesOk = false;
ioes.add(ioe);
failures.add(p);
}
}
if (ioes.size() != 0) {
// validation failed, bail out before doing anything permanent.
return;
}
for (Pair<byte[], String> p : familyPaths) {
byte[] familyName = p.getFirst();
String path = p.getSecond();
Store store = getStore(familyName);
try {
store.bulkLoadHFile(path);
} catch (IOException ioe) {
// a failure here causes an atomicity violation that we currently
// cannot recover from since it is likely a failed hdfs operation.
ioes.add(ioe);
failures.add(p);
break;
}
} }
store.bulkLoadHFile(hfilePath);
} finally { } finally {
closeRegionOperation(); closeBulkRegionOperation();
if (ioes.size() != 0) {
StringBuilder list = new StringBuilder();
for (Pair<byte[], String> p : failures) {
list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
.append(p.getSecond());
}
if (rangesOk) {
// TODO Need a better story for reverting partial failures due to HDFS.
LOG.error("There was a partial failure due to IO. These " +
"(family,hfile) pairs were not loaded: " + list);
} else {
// problem when validating
LOG.info("There was a recoverable bulk load failure likely due to a" +
" split. These (family, HFile) pairs were not loaded: " + list);
}
if (ioes.size() == 1) {
throw ioes.get(0);
}
throw MultipleIOException.createIOException(ioes);
}
} }
} }
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (!(o instanceof HRegion)) { if (!(o instanceof HRegion)) {
@ -4379,6 +4446,34 @@ public class HRegion implements HeapSize { // , Writable{
lock.readLock().unlock(); lock.readLock().unlock();
} }
/**
* This method needs to be called before any public call that reads or
* modifies stores in bulk. It has to be called just before a try.
* #closeBulkRegionOperation needs to be called in the try's finally block
* Acquires a writelock and checks if the region is closing or closed.
* @throws NotServingRegionException when the region is closing or closed
*/
private void startBulkRegionOperation() throws NotServingRegionException {
if (this.closing.get()) {
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing");
}
lock.writeLock().lock();
if (this.closed.get()) {
lock.writeLock().unlock();
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closed");
}
}
/**
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of #startRegionOperation
*/
private void closeBulkRegionOperation(){
lock.writeLock().unlock();
}
/** /**
* A mocked list implementaion - discards all updates. * A mocked list implementaion - discards all updates.
*/ */

View File

@ -2434,12 +2434,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
} }
} }
/**
* Atomically bulk load several HFiles into an open region
*/
@Override @Override
public void bulkLoadHFile(String hfilePath, byte[] regionName, public void bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
byte[] familyName) throws IOException { byte[] regionName) throws IOException {
checkOpen(); checkOpen();
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
region.bulkLoadHFile(hfilePath, familyName); region.bulkLoadHFiles(familyPaths);
} }
Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>(); Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();

View File

@ -345,9 +345,12 @@ public class Store implements HeapSize {
return this.storefiles; return this.storefiles;
} }
public void bulkLoadHFile(String srcPathStr) throws IOException { /**
Path srcPath = new Path(srcPathStr); * This throws a WrongRegionException if the bulkHFile does not fit in this
* region.
*
*/
void assertBulkLoadHFileOk(Path srcPath) throws IOException {
HFile.Reader reader = null; HFile.Reader reader = null;
try { try {
LOG.info("Validating hfile at " + srcPath + " for inclusion in " LOG.info("Validating hfile at " + srcPath + " for inclusion in "
@ -371,12 +374,21 @@ public class Store implements HeapSize {
HRegionInfo hri = region.getRegionInfo(); HRegionInfo hri = region.getRegionInfo();
if (!hri.containsRange(firstKey, lastKey)) { if (!hri.containsRange(firstKey, lastKey)) {
throw new WrongRegionException( throw new WrongRegionException(
"Bulk load file " + srcPathStr + " does not fit inside region " "Bulk load file " + srcPath.toString() + " does not fit inside region "
+ this.region); + this.region);
} }
} finally { } finally {
if (reader != null) reader.close(); if (reader != null) reader.close();
} }
}
/**
* This method should only be called from HRegion. It is assumed that the
* ranges of values in the HFile fit within the stores assigned region.
* (assertBulkLoadHFileOk checks this)
*/
void bulkLoadHFile(String srcPathStr) throws IOException {
Path srcPath = new Path(srcPathStr);
// Move the file if it's on another filesystem // Move the file if it's on another filesystem
FileSystem srcFs = srcPath.getFileSystem(conf); FileSystem srcFs = srcPath.getFileSystem(conf);

View File

@ -129,6 +129,14 @@
server, getting a cell's value, starting a row update, etc. server, getting a cell's value, starting a row update, etc.
Default: 10. Default: 10.
</description> </description>
</property>
<property>
<name>hbase.bulkload.retries.number</name>
<value>10</value>
<description>Maximum retries. This is maximum number of iterations
to atomic bulk loads are attempted in the face of splitting operations
Default: 10.
</description>
</property> </property>
<property> <property>
<name>hbase.client.scanner.caching</name> <name>hbase.client.scanner.caching</name>

View File

@ -0,0 +1,324 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.Multimap;
/**
* Test cases for the atomic load error handling of the bulk load functionality.
*/
public class TestLoadIncrementalHFilesSplitRecovery {
final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
private static HBaseTestingUtility util;
final static int NUM_CFS = 10;
final static byte[] QUAL = Bytes.toBytes("qual");
final static int ROWCOUNT = 100;
private final static byte[][] families = new byte[NUM_CFS][];
static {
for (int i = 0; i < NUM_CFS; i++) {
families[i] = Bytes.toBytes(family(i));
}
}
static byte[] rowkey(int i) {
return Bytes.toBytes(String.format("row_%08d", i));
}
static String family(int i) {
return String.format("family_%04d", i);
}
static byte[] value(int i) {
return Bytes.toBytes(String.format("%010d", i));
}
public static void buildHFiles(FileSystem fs, Path dir, int value)
throws IOException {
byte[] val = value(value);
for (int i = 0; i < NUM_CFS; i++) {
Path testIn = new Path(dir, family(i));
TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
}
}
/**
* Creates a table with given table name and specified number of column
* families if the table does not already exist.
*/
private void setupTable(String table, int cfs) throws IOException {
try {
LOG.info("Creating table " + table);
HTableDescriptor htd = new HTableDescriptor(table);
for (int i = 0; i < 10; i++) {
htd.addFamily(new HColumnDescriptor(family(i)));
}
HBaseAdmin admin = util.getHBaseAdmin();
admin.createTable(htd);
} catch (TableExistsException tee) {
LOG.info("Table " + table + " already exists");
}
}
@BeforeClass
public static void setupCluster() throws Exception {
util = new HBaseTestingUtility();
util.startMiniCluster(1);
}
@AfterClass
public static void teardownCluster() throws Exception {
util.shutdownMiniCluster();
}
void assertExpectedTable(String table, int count, int value) {
try {
assertEquals(util.getHBaseAdmin().listTables(table).length, 1);
HTable t = new HTable(util.getConfiguration(), table);
Scan s = new Scan();
ResultScanner sr = t.getScanner(s);
int i = 0;
for (Result r : sr) {
i++;
for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
for (byte[] val : nm.values()) {
assertTrue(Bytes.equals(val, value(value)));
}
}
}
assertEquals(count, i);
} catch (IOException e) {
fail("Failed due to exception");
}
}
@Test
public void testBulkLoadPhaseRecovery() throws Exception {
String table = "bulkPhaseRetry";
setupTable(table, 10);
final AtomicInteger attmptedCalls = new AtomicInteger();
final AtomicInteger failedCalls = new AtomicInteger();
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) {
protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
int i = attmptedCalls.incrementAndGet();
if (i == 1) {
HConnection errConn = mock(HConnection.class);
try {
doThrow(new IOException("injecting bulk load error")).when(errConn)
.getRegionServerWithRetries((ServerCallable) anyObject());
} catch (Exception e) {
LOG.fatal("mocking cruft, should never happen", e);
throw new RuntimeException("mocking cruft, should never happen");
}
failedCalls.incrementAndGet();
return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
}
return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
}
};
// create HFiles for different column families
FileSystem fs = util.getTestFileSystem();
Path dir = util.getDataTestDir(table);
buildHFiles(fs, dir, 1);
HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
lih.doBulkLoad(dir, t);
// check that data was loaded
assertEquals(attmptedCalls.get(), 2);
assertEquals(failedCalls.get(), 1);
assertExpectedTable(table, ROWCOUNT, 1);
}
/**
* This test exercises the path where there is a split after initial
* validation but before the atomic bulk load call. We cannot use presplitting
* to test this path, so we actually inject a split just before the atomic
* region load.
*/
@Test
public void testSplitWhileBulkLoadPhase() throws Exception {
final String table = "bulkPhaseSplit";
setupTable(table, 10);
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration());
// create HFiles for different column families
Path dir = util.getDataTestDir(table);
Path bulk1 = new Path(dir, "normalBulkload");
FileSystem fs = util.getTestFileSystem();
buildHFiles(fs, bulk1, 1);
HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
lih.doBulkLoad(bulk1, t);
assertExpectedTable(table, ROWCOUNT, 1);
// Now let's cause trouble
final AtomicInteger attmptedCalls = new AtomicInteger();
LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(
util.getConfiguration()) {
protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) {
int i = attmptedCalls.incrementAndGet();
if (i == 1) {
// On first attempt force a split.
try {
// need to call regions server to by synchronous but isn't visible.
HRegionServer hrs = util.getRSForFirstRegionInTable(Bytes
.toBytes(table));
HRegionInfo region = null;
for (HRegionInfo hri : hrs.getOnlineRegions()) {
if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) {
// splitRegion doesn't work if startkey/endkey are null
hrs.splitRegion(hri, rowkey(ROWCOUNT / 2)); // hard code split
}
}
int regions;
do {
regions = 0;
for (HRegionInfo hri : hrs.getOnlineRegions()) {
if (Bytes.equals(hri.getTableName(), Bytes.toBytes(table))) {
regions++;
}
}
if (regions != 2) {
LOG.info("Taking some time to complete split...");
Thread.sleep(250);
}
} while (regions != 2);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
}
};
// create HFiles for different column families
Path bulk2 = new Path(dir, "bulkload2");
buildHFiles(fs, bulk2, 2); // all values are '2'
lih2.doBulkLoad(bulk2, t);
// check that data was loaded
// The three expected attempts are 1) failure because need to split, 2)
// load of split top 3) load of split bottom
assertEquals(attmptedCalls.get(), 3);
assertExpectedTable(table, ROWCOUNT, 2);
}
@Test(expected = IOException.class)
public void testGroupOrSplitFailure() throws Exception {
String table = "groupOrSplitStoreFail";
setupTable(table, 10);
try {
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) {
int i = 0;
protected List<LoadQueueItem> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable table,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
i++;
if (i == 5) {
throw new IOException("failure");
}
return super.groupOrSplit(regionGroups, item, table, startEndKeys);
}
};
Path dir = util.getDataTestDir(table);
// create HFiles for different column families
FileSystem fs = util.getTestFileSystem();
buildHFiles(fs, dir, 1);
HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
lih.doBulkLoad(dir, t);
// check that data was loaded
assertExpectedTable(table, ROWCOUNT, 1);
} finally {
util.shutdownMiniCluster();
}
}
}

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import com.google.common.collect.Lists;
/**
* Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
* the region server's bullkLoad functionality.
*/
public class TestHRegionServerBulkLoad {
final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private final static Configuration conf = UTIL.getConfiguration();
private final static byte[] QUAL = Bytes.toBytes("qual");
private final static int NUM_CFS = 10;
public static int BLOCKSIZE = 64 * 1024;
public static String COMPRESSION = Compression.Algorithm.NONE.getName();
private final static byte[][] families = new byte[NUM_CFS][];
static {
for (int i = 0; i < NUM_CFS; i++) {
families[i] = Bytes.toBytes(family(i));
}
}
static byte[] rowkey(int i) {
return Bytes.toBytes(String.format("row_%08d", i));
}
static String family(int i) {
return String.format("family_%04d", i);
}
/**
* Create an HFile with the given number of rows with a specified value.
*/
public static void createHFile(FileSystem fs, Path path, byte[] family,
byte[] qualifier, byte[] value, int numRows) throws IOException {
HFile.Writer writer = HFile
.getWriterFactory(conf, new CacheConfig(conf))
.createWriter(fs, path, BLOCKSIZE, COMPRESSION, KeyValue.KEY_COMPARATOR);
long now = System.currentTimeMillis();
try {
// subtract 2 since iterateOnSplits doesn't include boundary keys
for (int i = 0; i < numRows; i++) {
KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
writer.append(kv);
}
} finally {
writer.close();
}
}
/**
* Thread that does full scans of the table looking for any partially
* completed rows.
*
* Each iteration of this loads 10 hdfs files, which occupies 5 file open file
* handles. So every 10 iterations (500 file handles) it does a region
* compaction to reduce the number of open file handles.
*/
public static class AtomicHFileLoader extends RepeatingTestThread {
final AtomicLong numBulkLoads = new AtomicLong();
final AtomicLong numCompactions = new AtomicLong();
private String tableName;
public AtomicHFileLoader(String tableName, TestContext ctx,
byte targetFamilies[][]) throws IOException {
super(ctx);
this.tableName = tableName;
}
public void doAnAction() throws Exception {
long iteration = numBulkLoads.getAndIncrement();
Path dir = UTIL.getDataTestDir(String.format("bulkLoad_%08d",
iteration));
// create HFiles for different column families
FileSystem fs = UTIL.getTestFileSystem();
byte[] val = Bytes.toBytes(String.format("%010d", iteration));
final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
NUM_CFS);
for (int i = 0; i < NUM_CFS; i++) {
Path hfile = new Path(dir, family(i));
byte[] fam = Bytes.toBytes(family(i));
createHFile(fs, hfile, fam, QUAL, val, 1000);
famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
}
// bulk load HFiles
HConnection conn = UTIL.getHBaseAdmin().getConnection();
byte[] tbl = Bytes.toBytes(tableName);
conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl, Bytes
.toBytes("aaa")) {
@Override
public Void call() throws Exception {
LOG.debug("Going to connect to server " + location + " for row "
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
server.bulkLoadHFiles(famPaths, regionName);
return null;
}
});
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 10 == 0) {
// 10 * 50 = 500 open file handles!
conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl,
Bytes.toBytes("aaa")) {
@Override
public Void call() throws Exception {
LOG.debug("compacting " + location + " for row "
+ Bytes.toStringBinary(row));
server.compactRegion(location.getRegionInfo(), true);
numCompactions.incrementAndGet();
return null;
}
});
}
}
}
/**
* Thread that does full scans of the table looking for any partially
* completed rows.
*/
public static class AtomicScanReader extends RepeatingTestThread {
byte targetFamilies[][];
HTable table;
AtomicLong numScans = new AtomicLong();
AtomicLong numRowsScanned = new AtomicLong();
String TABLE_NAME;
public AtomicScanReader(String TABLE_NAME, TestContext ctx,
byte targetFamilies[][]) throws IOException {
super(ctx);
this.TABLE_NAME = TABLE_NAME;
this.targetFamilies = targetFamilies;
table = new HTable(conf, TABLE_NAME);
}
public void doAnAction() throws Exception {
Scan s = new Scan();
for (byte[] family : targetFamilies) {
s.addFamily(family);
}
ResultScanner scanner = table.getScanner(s);
for (Result res : scanner) {
byte[] lastRow = null, lastFam = null, lastQual = null;
byte[] gotValue = null;
for (byte[] family : targetFamilies) {
byte qualifier[] = QUAL;
byte thisValue[] = res.getValue(family, qualifier);
if (gotValue != null && thisValue != null
&& !Bytes.equals(gotValue, thisValue)) {
StringBuilder msg = new StringBuilder();
msg.append("Failed on scan ").append(numScans)
.append(" after scanning ").append(numRowsScanned)
.append(" rows!\n");
msg.append("Current was " + Bytes.toString(res.getRow()) + "/"
+ Bytes.toString(family) + ":" + Bytes.toString(qualifier)
+ " = " + Bytes.toString(thisValue) + "\n");
msg.append("Previous was " + Bytes.toString(lastRow) + "/"
+ Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
+ " = " + Bytes.toString(gotValue));
throw new RuntimeException(msg.toString());
}
lastFam = family;
lastQual = qualifier;
lastRow = res.getRow();
gotValue = thisValue;
}
numRowsScanned.getAndIncrement();
}
numScans.getAndIncrement();
}
}
/**
* Creates a table with given table name and specified number of column
* families if the table does not already exist.
*/
private void setupTable(String table, int cfs) throws IOException {
try {
LOG.info("Creating table " + table);
HTableDescriptor htd = new HTableDescriptor(table);
for (int i = 0; i < 10; i++) {
htd.addFamily(new HColumnDescriptor(family(i)));
}
HBaseAdmin admin = UTIL.getHBaseAdmin();
admin.createTable(htd);
} catch (TableExistsException tee) {
LOG.info("Table " + table + " already exists");
}
}
/**
* Atomic bulk load.
*/
@Test
public void testAtomicBulkLoad() throws Exception {
String TABLE_NAME = "atomicBulkLoad";
int millisToRun = 30000;
int numScanners = 50;
UTIL.startMiniCluster(1);
try {
runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
} finally {
UTIL.shutdownMiniCluster();
}
}
void runAtomicBulkloadTest(String tableName, int millisToRun, int numScanners)
throws Exception {
setupTable(tableName, 10);
TestContext ctx = new TestContext(UTIL.getConfiguration());
AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
ctx.addThread(loader);
List<AtomicScanReader> scanners = Lists.newArrayList();
for (int i = 0; i < numScanners; i++) {
AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
scanners.add(scanner);
ctx.addThread(scanner);
}
ctx.startThreads();
ctx.waitFor(millisToRun);
ctx.stop();
LOG.info("Loaders:");
LOG.info(" loaded " + loader.numBulkLoads.get());
LOG.info(" compations " + loader.numCompactions.get());
LOG.info("Scanners:");
for (AtomicScanReader scanner : scanners) {
LOG.info(" scanned " + scanner.numScans.get());
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
}
}
/**
* Run test on an HBase instance for 5 minutes. This assumes that the table
* under test only has a single region.
*/
public static void main(String args[]) throws Exception {
try {
Configuration c = HBaseConfiguration.create();
TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
test.setConf(c);
test.runAtomicBulkloadTest("atomicTableTest", 5 * 60 * 1000, 50);
} finally {
System.exit(0); // something hangs (believe it is lru threadpool)
}
}
private void setConf(Configuration c) {
UTIL = new HBaseTestingUtility(c);
}
}

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.FlushRequester;
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -205,7 +206,9 @@ public class TestWALReplay {
byte [] row = Bytes.toBytes(tableNameStr); byte [] row = Bytes.toBytes(tableNameStr);
writer.append(new KeyValue(row, family, family, row)); writer.append(new KeyValue(row, family, family, row));
writer.close(); writer.close();
region.bulkLoadHFile(f.toString(), family); List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
hfs.add(Pair.newPair(family, f.toString()));
region.bulkLoadHFiles(hfs);
// Add an edit so something in the WAL // Add an edit so something in the WAL
region.put((new Put(row)).add(family, family, family)); region.put((new Put(row)).add(family, family, family));
wal.sync(); wal.sync();