diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java index 702ed750466..c7fe1e24a1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java @@ -37,11 +37,16 @@ public interface BulkLoadHFiles { static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; static final String MAX_FILES_PER_REGION_PER_FAMILY = - "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; + "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; static final String CREATE_TABLE_CONF_KEY = "create.table"; static final String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; static final String ALWAYS_COPY_FILES = "always.copy.files"; + /** + * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because + * of compacting at server side + */ + public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family"; /** * Represents an HFile waiting to be loaded. An queue is used in this class in order to support diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 7b306f4162c..8e634435eec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -148,6 +148,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private final int maxFilesPerRegionPerFamily; private final boolean assignSeqIds; + private boolean bulkLoadByFamily; // Source delegation token private final FsDelegationToken fsDelegationToken; @@ -190,6 +191,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); + bulkLoadByFamily = conf.getBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false); nrThreads = conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); numRetries = new AtomicInteger(0); @@ -465,6 +467,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return item2RegionMap; } + private Map> + groupByFamilies(Collection itemsInRegion) { + Map> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR); + itemsInRegion.forEach(item -> families2Queue + .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item)); + return families2Queue; + } + /** * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are * re-queued for another pass with the groupOrSplitPhase. @@ -481,24 +491,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool { .entrySet()) { byte[] first = e.getKey().array(); Collection lqis = e.getValue(); - - ClientServiceCallable serviceCallable = - buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile); - - Callable> call = new Callable>() { - @Override - public List call() throws Exception { - List toRetry = - tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis); - return toRetry; - } - }; if (item2RegionMap != null) { for (LoadQueueItem lqi : lqis) { item2RegionMap.put(lqi, e.getKey()); } } - loadingFutures.add(pool.submit(call)); + if (bulkLoadByFamily) { + groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures.add(pool.submit( + () -> tryAtomicRegionLoad(conn, table.getName(), first, familyQueue, copyFile)))); + } else { + loadingFutures.add( + pool.submit(() -> tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile))); + } } // get all the results. @@ -801,10 +805,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return empty list if success, list of items to retry on recoverable failure */ @VisibleForTesting - protected List tryAtomicRegionLoad(ClientServiceCallable serviceCallable, - final TableName tableName, final byte[] first, final Collection lqis) - throws IOException { + protected List tryAtomicRegionLoad(final Connection conn, + final TableName tableName, final byte[] first, final Collection lqis, + boolean copyFile) throws IOException { List toRetry = new ArrayList<>(); + ClientServiceCallable serviceCallable = + buildClientServiceCallable(conn, tableName, first, lqis, copyFile); try { Configuration conf = getConf(); byte[] region = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java index 8d3d04dfa38..e1ed7688332 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java @@ -21,14 +21,15 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -64,7 +66,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; - import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** @@ -755,4 +756,43 @@ public class TestLoadIncrementalHFiles { } } } + + @Test + public void testBulkLoadByFamily() throws Exception { + Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily"); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + String tableName = tn.getMethodName(); + String[] families = { "cf1", "cf2", "cf3" }; + for (int i = 0; i < families.length; i++) { + byte[] from = Bytes.toBytes(i + "begin"); + byte[] to = Bytes.toBytes(i + "end"); + Path familyDir = new Path(dir, families[i]); + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"), + Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000); + } + Table table = util.createTable(TableName.valueOf(tableName), families); + final AtomicInteger attmptedCalls = new AtomicInteger(); + util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, true); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { + @Override + protected List tryAtomicRegionLoad(Connection connection, TableName tableName, + final byte[] first, Collection lqis, boolean copyFile) throws IOException { + attmptedCalls.incrementAndGet(); + return super.tryAtomicRegionLoad(connection, tableName, first, lqis, copyFile); + } + }; + + String[] args = { dir.toString(), tableName }; + try { + loader.run(args); + assertEquals(families.length, attmptedCalls.get()); + assertEquals(1000 * families.length, util.countRows(table)); + } finally { + if (null != table) { + table.close(); + } + util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java index aae529b72be..05c2457aa9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java @@ -287,24 +287,23 @@ public class TestLoadIncrementalHFilesSplitRecovery { setupTable(connection, table, 10); LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { @Override - protected List tryAtomicRegionLoad( - ClientServiceCallable serviceCallable, TableName tableName, final byte[] first, - Collection lqis) throws IOException { + protected List tryAtomicRegionLoad(Connection connection, + TableName tableName, final byte[] first, Collection lqis, + boolean copyFile) throws IOException { int i = attmptedCalls.incrementAndGet(); if (i == 1) { Connection errConn; try { errConn = getMockedConnection(util.getConfiguration()); - serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true); } catch (Exception e) { LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e); throw new RuntimeException("mocking cruft, should never happen"); } failedCalls.incrementAndGet(); - return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); + return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, true); } - return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); + return super.tryAtomicRegionLoad(connection, tableName, first, lqis, true); } }; try { @@ -337,23 +336,21 @@ public class TestLoadIncrementalHFilesSplitRecovery { util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { @Override - protected List tryAtomicRegionLoad( - ClientServiceCallable serverCallable, TableName tableName, final byte[] first, - Collection lqis) throws IOException { - if (calls.get() < util.getConfiguration().getInt( - HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { - ClientServiceCallable newServerCallable = new ClientServiceCallable(conn, - tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(), + protected ClientServiceCallable buildClientServiceCallable(Connection conn, + TableName tableName, byte[] first, Collection lqis, boolean copyFile) { + if (calls.get() < util.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { + calls.getAndIncrement(); + return new ClientServiceCallable(conn, tableName, first, + new RpcControllerFactory(util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override public byte[] rpcCall() throws Exception { throw new IOException("Error calling something on RegionServer"); } }; - calls.getAndIncrement(); - return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); } else { - return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); + return super.buildClientServiceCallable(conn, tableName, first, lqis, true); } } };