diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index 2b674893355..b42fbbefb8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -119,6 +119,11 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To * Whether to run validation on hfiles before loading. */ private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile"; + /** + * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because + * of compacting at server side + */ + public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family"; // We use a '.' prefix which is ignored when walking directory trees // above. It is invalid family name. @@ -126,6 +131,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To private final int maxFilesPerRegionPerFamily; private final boolean assignSeqIds; + private boolean bulkLoadByFamily; // Source delegation token private final FsDelegationToken fsDelegationToken; @@ -146,8 +152,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); - nrThreads = - conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); + nrThreads = conf.getInt("hbase.loadincremental.threads.max", + Runtime.getRuntime().availableProcessors()); + bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false); } // Initialize a thread pool @@ -362,6 +369,54 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To } } + /** + * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of + * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To + * maintain row atomicity guarantees, region server side should succeed atomically and fails + * atomically. + * @return empty list if success, list of items to retry on recoverable failure + */ + private CompletableFuture> tryAtomicRegionLoad( + final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, + final byte[] first, Collection lqis) { + List> familyPaths = + lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) + .collect(Collectors.toList()); + CompletableFuture> future = new CompletableFuture<>(); + FutureUtils + .addListener( + conn.bulkLoad(tableName, familyPaths, first, assignSeqIds, + fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate), + (loaded, error) -> { + if (error != null) { + LOG.error("Encountered unrecoverable error from region server", error); + if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) + && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { + LOG.warn("Will attempt to retry loading failed HFiles. Retry #" + + numRetries.incrementAndGet()); + // return lqi's to retry + future.complete(lqis); + } else { + LOG.error(RETRY_ON_IO_EXCEPTION + + " is disabled or we have reached retry limit. Unable to recover"); + future.completeExceptionally(error); + } + } else { + if (loaded) { + future.complete(Collections.emptyList()); + } else { + LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + + " into table " + tableName + " with files " + lqis + + " failed. This is recoverable and they will be retried."); + // return lqi's to retry + future.complete(lqis); + } + } + }); + return future; + } + /** * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are * re-queued for another pass with the groupOrSplitPhase. @@ -375,43 +430,15 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To // atomically bulk load the groups. List>> loadingFutures = new ArrayList<>(); for (Entry> entry : regionGroups.asMap() - .entrySet()) { + .entrySet()) { byte[] first = entry.getKey().array(); final Collection lqis = entry.getValue(); - List> familyPaths = - lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) - .collect(Collectors.toList()); - CompletableFuture> future = new CompletableFuture<>(); - FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds, - fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate), - (loaded, error) -> { - if (error != null) { - LOG.error("Encountered unrecoverable error from region server", error); - if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) && - numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { - LOG.warn("Will attempt to retry loading failed HFiles. Retry #" + - numRetries.incrementAndGet()); - // return lqi's to retry - future.complete(lqis); - } else { - LOG.error(RETRY_ON_IO_EXCEPTION + - " is disabled or we have reached retry limit. Unable to recover"); - future.completeExceptionally(error); - } - } else { - if (loaded) { - future.complete(Collections.emptyList()); - } else { - LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + - " into table " + tableName + " with files " + lqis + - " failed. This is recoverable and they will be retried."); - // return lqi's to retry - future.complete(lqis); - } - } - }); - loadingFutures.add(future); + if (bulkLoadByFamily) { + groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures + .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue))); + } else { + loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis)); + } if (item2RegionMap != null) { for (LoadQueueItem lqi : lqis) { item2RegionMap.put(lqi, entry.getKey()); @@ -447,6 +474,14 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To } } + private Map> + groupByFamilies(Collection itemsInRegion) { + Map> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR); + itemsInRegion.forEach(item -> families2Queue + .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item)); + return families2Queue; + } + private boolean checkHFilesCountPerRegionPerFamily( final Multimap regionGroups) { for (Map.Entry> e : regionGroups.asMap().entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index 9642b8fce67..6d0e9142f6f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -102,6 +102,7 @@ public class TestBulkLoadHFiles { // change default behavior so that tag values are returned with normal rpcs util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getCanonicalName()); + util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); util.startMiniCluster(); setupNamespace(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java new file mode 100644 index 00000000000..42a81a95435 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.junit.BeforeClass; +import org.junit.ClassRule; + +public class TestBulkLoadHFilesByFamily extends TestBulkLoadHFiles { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadHFilesByFamily.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY, + MAX_FILES_PER_REGION_PER_FAMILY); + // change default behavior so that tag values are returned with normal rpcs + util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, + KeyValueCodecWithTags.class.getCanonicalName()); + util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); + util.startMiniCluster(); + setupNamespace(); + } +}