HBASE-17037 Enhance LoadIncrementalHFiles API to convey loaded files

This commit is contained in:
tedyu 2016-11-13 07:02:18 -08:00
parent dba7ec1b69
commit 9250bf8091
3 changed files with 52 additions and 29 deletions

View File

@ -362,12 +362,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param regionLocator region locator * @param regionLocator region locator
* @param silence true to ignore unmatched column families * @param silence true to ignore unmatched column families
* @param copyFile always copy hfiles if true * @param copyFile always copy hfiles if true
* @return List of filenames which were not found * @return Map of LoadQueueItem to region
* @throws TableNotFoundException if table does not yet exist * @throws TableNotFoundException if table does not yet exist
*/ */
public List<String> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table, public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
RegionLocator regionLocator, boolean silence, boolean copyFile) Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
throws TableNotFoundException, IOException { throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) { if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
} }
@ -449,8 +449,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} }
} }
List<String> performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, Map<LoadQueueItem, ByteBuffer> performBulkLoad(final Admin admin, Table table,
Deque<LoadQueueItem> queue, ExecutorService pool, RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
int count = 0; int count = 0;
@ -464,8 +464,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// fs is the source filesystem // fs is the source filesystem
fsDelegationToken.acquireDelegationToken(fs); fsDelegationToken.acquireDelegationToken(fs);
bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = null; Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
// Assumes that region splits can happen while this occurs. // Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) { while (!queue.isEmpty()) {
// need to reload split keys each iteration. // need to reload split keys each iteration.
@ -493,7 +494,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ " hfiles to one family of one region"); + " hfiles to one family of one region");
} }
bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile); bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
item2RegionMap);
// NOTE: The next iteration's split / group could happen in parallel to // NOTE: The next iteration's split / group could happen in parallel to
// atomic bulkloads assuming that there are splits and no merges, and // atomic bulkloads assuming that there are splits and no merges, and
@ -504,8 +506,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw new RuntimeException("Bulk load aborted with some files not yet loaded." throw new RuntimeException("Bulk load aborted with some files not yet loaded."
+ "Please check log for more details."); + "Please check log for more details.");
} }
if (pair == null) return null; return item2RegionMap;
return pair.getSecond();
} }
/** /**
@ -630,7 +631,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
pool = createExecutorService(); pool = createExecutorService();
Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimap<ByteBuffer, LoadQueueItem> regionGroups =
groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile); bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
} finally { } finally {
if (pool != null) { if (pool != null) {
pool.shutdown(); pool.shutdown();
@ -645,7 +646,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/ */
protected void bulkLoadPhase(final Table table, final Connection conn, protected void bulkLoadPhase(final Table table, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue, ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile) throws IOException { final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
// atomically bulk load the groups. // atomically bulk load the groups.
Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>(); Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){ for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){
@ -660,6 +662,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return toRetry; return toRetry;
} }
}; };
if (item2RegionMap != null) {
for (LoadQueueItem lqi : lqis) {
item2RegionMap.put(lqi, e.getKey());
}
}
loadingFutures.add(pool.submit(call)); loadingFutures.add(pool.submit(call));
} }
@ -668,6 +675,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try { try {
List<LoadQueueItem> toRetry = future.get(); List<LoadQueueItem> toRetry = future.get();
if (item2RegionMap != null) {
for (LoadQueueItem lqi : toRetry) {
item2RegionMap.remove(lqi);
}
}
// LQIs that are requeued to be regrouped. // LQIs that are requeued to be regrouped.
queue.addAll(toRetry); queue.addAll(toRetry);
@ -717,17 +729,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param pool the ExecutorService * @param pool the ExecutorService
* @param queue the queue for LoadQueueItem * @param queue the queue for LoadQueueItem
* @param startEndKeys start and end keys * @param startEndKeys start and end keys
* @return A map that groups LQI by likely bulk load region targets and List of missing hfiles. * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
*/ */
private Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> groupOrSplitPhase( private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
// <region start key, LQI> need synchronized only within this scope of this // <region start key, LQI> need synchronized only within this scope of this
// phase because of the puts that happen in futures. // phase because of the puts that happen in futures.
Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
List<String> missingHFiles = new ArrayList<>(); Set<String> missingHFiles = new HashSet<>();
Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = new Pair<>(regionGroups, Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = new Pair<>(regionGroups,
missingHFiles); missingHFiles);
// drain LQIs and figure out bulk load groups // drain LQIs and figure out bulk load groups
@ -942,10 +954,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
} }
} }
final ClientServiceCallable<Boolean> svrCallable = new ClientServiceCallable<Boolean>(conn, final ClientServiceCallable<byte[]> svrCallable = new ClientServiceCallable<byte[]>(conn,
tableName, first, rpcControllerFactory.newController()) { tableName, first, rpcControllerFactory.newController()) {
@Override @Override
protected Boolean rpcCall() throws Exception { protected byte[] rpcCall() throws Exception {
SecureBulkLoadClient secureClient = null; SecureBulkLoadClient secureClient = null;
boolean success = false; boolean success = false;
try { try {
@ -957,7 +969,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
} }
return success; return success ? regionName : null;
} finally { } finally {
//Best effort copying of files that might not have been imported //Best effort copying of files that might not have been imported
//from the staging directory back to original location //from the staging directory back to original location
@ -999,10 +1011,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try { try {
List<LoadQueueItem> toRetry = new ArrayList<>(); List<LoadQueueItem> toRetry = new ArrayList<>();
Configuration conf = getConf(); Configuration conf = getConf();
boolean success = RpcRetryingCallerFactory.instantiate(conf, byte[] region = RpcRetryingCallerFactory.instantiate(conf,
null).<Boolean> newCaller() null).<byte[]> newCaller()
.callWithRetries(svrCallable, Integer.MAX_VALUE); .callWithRetries(svrCallable, Integer.MAX_VALUE);
if (!success) { if (region == null) {
LOG.warn("Attempt to bulk load region containing " LOG.warn("Attempt to bulk load region containing "
+ Bytes.toStringBinary(first) + " into table " + Bytes.toStringBinary(first) + " into table "
+ tableName + " with files " + lqis + tableName + " with files " + lqis
@ -1193,7 +1205,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.info("Table "+ tableName +" is available!!"); LOG.info("Table "+ tableName +" is available!!");
} }
public List<String> run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{ public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
TableName tableName) throws Exception{
initialize(); initialize();
try (Connection connection = ConnectionFactory.createConnection(getConf()); try (Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin()) { Admin admin = connection.getAdmin()) {
@ -1236,8 +1249,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String dirPath = args[0]; String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]); TableName tableName = TableName.valueOf(args[1]);
List<String> missingHFiles = run(dirPath, null, tableName); Map<LoadQueueItem, ByteBuffer> loaded = run(dirPath, null, tableName);
if (missingHFiles == null) return 0; if (loaded == null || !loaded.isEmpty()) return 0;
return -1; return -1;
} }

View File

@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests;
@ -349,9 +352,13 @@ public class TestLoadIncrementalHFiles {
String [] args= {dir.toString(), tableName.toString()}; String [] args= {dir.toString(), tableName.toString()};
if (useMap) { if (useMap) {
fs.delete(last); fs.delete(last);
List<String> missingHFiles = loader.run(null, map, tableName); Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName);
expectedRows -= 1000; expectedRows -= 1000;
assertTrue(missingHFiles.contains(last.getName())); for (LoadQueueItem item : loaded.keySet()) {
if (item.hfilePath.getName().equals(last.getName())) {
fail(last + " should be missing");
}
}
} else { } else {
loader.run(args); loader.run(args);
} }

View File

@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.Collection; import java.util.Collection;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@ -360,14 +362,15 @@ public class TestLoadIncrementalHFilesSplitRecovery {
@Override @Override
protected void bulkLoadPhase(final Table htable, final Connection conn, protected void bulkLoadPhase(final Table htable, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue, ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile) final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
Map<LoadQueueItem, ByteBuffer> item2RegionMap)
throws IOException { throws IOException {
int i = attemptedCalls.incrementAndGet(); int i = attemptedCalls.incrementAndGet();
if (i == 1) { if (i == 1) {
// On first attempt force a split. // On first attempt force a split.
forceSplit(table); forceSplit(table);
} }
super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile); super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
} }
}; };