From 9250bf809155ebe93fd6ae8a0485b22c744fdf70 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 13 Nov 2016 07:02:18 -0800 Subject: [PATCH] HBASE-17037 Enhance LoadIncrementalHFiles API to convey loaded files --- .../mapreduce/LoadIncrementalHFiles.java | 63 +++++++++++-------- .../mapreduce/TestLoadIncrementalHFiles.java | 11 +++- ...estLoadIncrementalHFilesSplitRecovery.java | 7 ++- 3 files changed, 52 insertions(+), 29 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 3647637beb8..574ae18eac8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -362,12 +362,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param regionLocator region locator * @param silence true to ignore unmatched column families * @param copyFile always copy hfiles if true - * @return List of filenames which were not found + * @return Map of LoadQueueItem to region * @throws TableNotFoundException if table does not yet exist */ - public List doBulkLoad(Map> map, final Admin admin, Table table, - RegionLocator regionLocator, boolean silence, boolean copyFile) - throws TableNotFoundException, IOException { + public Map doBulkLoad(Map> map, final Admin admin, + Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) + throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); } @@ -449,8 +449,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } - List performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, - Deque queue, ExecutorService pool, + Map performBulkLoad(final Admin admin, Table table, + RegionLocator regionLocator, Deque queue, ExecutorService pool, SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { int count = 0; @@ -464,8 +464,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // fs is the source filesystem fsDelegationToken.acquireDelegationToken(fs); bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); - Pair, List> pair = null; + Pair, Set> pair = null; + Map item2RegionMap = new HashMap<>(); // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { // need to reload split keys each iteration. @@ -493,7 +494,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + " hfiles to one family of one region"); } - bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile); + bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, + item2RegionMap); // NOTE: The next iteration's split / group could happen in parallel to // atomic bulkloads assuming that there are splits and no merges, and @@ -504,8 +506,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw new RuntimeException("Bulk load aborted with some files not yet loaded." + "Please check log for more details."); } - if (pair == null) return null; - return pair.getSecond(); + return item2RegionMap; } /** @@ -630,7 +631,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { pool = createExecutorService(); Multimap regionGroups = groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); - bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile); + bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null); } finally { if (pool != null) { pool.shutdown(); @@ -645,7 +646,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ protected void bulkLoadPhase(final Table table, final Connection conn, ExecutorService pool, Deque queue, - final Multimap regionGroups, boolean copyFile) throws IOException { + final Multimap regionGroups, boolean copyFile, + Map item2RegionMap) throws IOException { // atomically bulk load the groups. Set>> loadingFutures = new HashSet<>(); for (Entry> e: regionGroups.asMap().entrySet()){ @@ -660,6 +662,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return toRetry; } }; + if (item2RegionMap != null) { + for (LoadQueueItem lqi : lqis) { + item2RegionMap.put(lqi, e.getKey()); + } + } loadingFutures.add(pool.submit(call)); } @@ -668,6 +675,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { List toRetry = future.get(); + if (item2RegionMap != null) { + for (LoadQueueItem lqi : toRetry) { + item2RegionMap.remove(lqi); + } + } // LQIs that are requeued to be regrouped. queue.addAll(toRetry); @@ -717,17 +729,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param pool the ExecutorService * @param queue the queue for LoadQueueItem * @param startEndKeys start and end keys - * @return A map that groups LQI by likely bulk load region targets and List of missing hfiles. + * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. */ - private Pair, List> groupOrSplitPhase( + private Pair, Set> groupOrSplitPhase( final Table table, ExecutorService pool, Deque queue, final Pair startEndKeys) throws IOException { // need synchronized only within this scope of this // phase because of the puts that happen in futures. Multimap rgs = HashMultimap.create(); final Multimap regionGroups = Multimaps.synchronizedMultimap(rgs); - List missingHFiles = new ArrayList<>(); - Pair, List> pair = new Pair<>(regionGroups, + Set missingHFiles = new HashSet<>(); + Pair, Set> pair = new Pair<>(regionGroups, missingHFiles); // drain LQIs and figure out bulk load groups @@ -942,10 +954,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); } } - final ClientServiceCallable svrCallable = new ClientServiceCallable(conn, + final ClientServiceCallable svrCallable = new ClientServiceCallable(conn, tableName, first, rpcControllerFactory.newController()) { @Override - protected Boolean rpcCall() throws Exception { + protected byte[] rpcCall() throws Exception { SecureBulkLoadClient secureClient = null; boolean success = false; try { @@ -957,7 +969,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); } - return success; + return success ? regionName : null; } finally { //Best effort copying of files that might not have been imported //from the staging directory back to original location @@ -999,10 +1011,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { List toRetry = new ArrayList<>(); Configuration conf = getConf(); - boolean success = RpcRetryingCallerFactory.instantiate(conf, - null). newCaller() + byte[] region = RpcRetryingCallerFactory.instantiate(conf, + null). newCaller() .callWithRetries(svrCallable, Integer.MAX_VALUE); - if (!success) { + if (region == null) { LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + " into table " + tableName + " with files " + lqis @@ -1193,7 +1205,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - public List run(String dirPath, Map> map, TableName tableName) throws Exception{ + public Map run(String dirPath, Map> map, + TableName tableName) throws Exception{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { @@ -1236,8 +1249,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String dirPath = args[0]; TableName tableName = TableName.valueOf(args[1]); - List missingHFiles = run(dirPath, null, tableName); - if (missingHFiles == null) return 0; + Map loaded = run(dirPath, null, tableName); + if (loaded == null || !loaded.isEmpty()) return 0; return -1; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index fe7abcd6e65..ebc7c978581 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.Locale; import java.util.Map; @@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; @@ -349,9 +352,13 @@ public class TestLoadIncrementalHFiles { String [] args= {dir.toString(), tableName.toString()}; if (useMap) { fs.delete(last); - List missingHFiles = loader.run(null, map, tableName); + Map loaded = loader.run(null, map, tableName); expectedRows -= 1000; - assertTrue(missingHFiles.contains(last.getName())); + for (LoadQueueItem item : loaded.keySet()) { + if (item.hfilePath.getName().equals(last.getName())) { + fail(last + " should be missing"); + } + } } else { loader.run(args); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index a1ed832b751..8337da80608 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Deque; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; @@ -360,14 +362,15 @@ public class TestLoadIncrementalHFilesSplitRecovery { @Override protected void bulkLoadPhase(final Table htable, final Connection conn, ExecutorService pool, Deque queue, - final Multimap regionGroups, boolean copyFile) + final Multimap regionGroups, boolean copyFile, + Map item2RegionMap) throws IOException { int i = attemptedCalls.incrementAndGet(); if (i == 1) { // On first attempt force a split. forceSplit(table); } - super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile); + super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); } };