HBASE-24221 Support bulkLoadHFile by family (#1569)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
niuyulin 2020-04-30 05:33:37 -05:00 committed by GitHub
parent b810c9bb91
commit 512d00e75d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 37 deletions

View File

@ -119,6 +119,11 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
* Whether to run validation on hfiles before loading.
*/
private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile";
/**
* HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because
* of compacting at server side
*/
public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
// We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name.
@ -126,6 +131,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
private final int maxFilesPerRegionPerFamily;
private final boolean assignSeqIds;
private boolean bulkLoadByFamily;
// Source delegation token
private final FsDelegationToken fsDelegationToken;
@ -146,8 +152,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
nrThreads =
conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
nrThreads = conf.getInt("hbase.loadincremental.threads.max",
Runtime.getRuntime().availableProcessors());
bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false);
}
// Initialize a thread pool
@ -362,6 +369,54 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
}
}
/**
* Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
* hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To
* maintain row atomicity guarantees, region server side should succeed atomically and fails
* atomically.
* @return empty list if success, list of items to retry on recoverable failure
*/
private CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
final byte[] first, Collection<LoadQueueItem> lqis) {
List<Pair<byte[], String>> familyPaths =
lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
.collect(Collectors.toList());
CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
FutureUtils
.addListener(
conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
(loaded, error) -> {
if (error != null) {
LOG.error("Encountered unrecoverable error from region server", error);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
&& numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
+ numRetries.incrementAndGet());
// return lqi's to retry
future.complete(lqis);
} else {
LOG.error(RETRY_ON_IO_EXCEPTION
+ " is disabled or we have reached retry limit. Unable to recover");
future.completeExceptionally(error);
}
} else {
if (loaded) {
future.complete(Collections.emptyList());
} else {
LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
+ " into table " + tableName + " with files " + lqis
+ " failed. This is recoverable and they will be retried.");
// return lqi's to retry
future.complete(lqis);
}
}
});
return future;
}
/**
* This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
* re-queued for another pass with the groupOrSplitPhase.
@ -375,43 +430,15 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
// atomically bulk load the groups.
List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>();
for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap()
.entrySet()) {
.entrySet()) {
byte[] first = entry.getKey().array();
final Collection<LoadQueueItem> lqis = entry.getValue();
List<Pair<byte[], String>> familyPaths =
lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
.collect(Collectors.toList());
CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
(loaded, error) -> {
if (error != null) {
LOG.error("Encountered unrecoverable error from region server", error);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
LOG.warn("Will attempt to retry loading failed HFiles. Retry #" +
numRetries.incrementAndGet());
// return lqi's to retry
future.complete(lqis);
} else {
LOG.error(RETRY_ON_IO_EXCEPTION +
" is disabled or we have reached retry limit. Unable to recover");
future.completeExceptionally(error);
}
} else {
if (loaded) {
future.complete(Collections.emptyList());
} else {
LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) +
" into table " + tableName + " with files " + lqis +
" failed. This is recoverable and they will be retried.");
// return lqi's to retry
future.complete(lqis);
}
}
});
loadingFutures.add(future);
if (bulkLoadByFamily) {
groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures
.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue)));
} else {
loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis));
}
if (item2RegionMap != null) {
for (LoadQueueItem lqi : lqis) {
item2RegionMap.put(lqi, entry.getKey());
@ -447,6 +474,14 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
}
}
private Map<byte[], Collection<LoadQueueItem>>
groupByFamilies(Collection<LoadQueueItem> itemsInRegion) {
Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR);
itemsInRegion.forEach(item -> families2Queue
.computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item));
return families2Queue;
}
private boolean checkHFilesCountPerRegionPerFamily(
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {

View File

@ -102,6 +102,7 @@ public class TestBulkLoadHFiles {
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodecWithTags.class.getCanonicalName());
util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
util.startMiniCluster();
setupNamespace();

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.tool;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.junit.BeforeClass;
import org.junit.ClassRule;
public class TestBulkLoadHFilesByFamily extends TestBulkLoadHFiles {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBulkLoadHFilesByFamily.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY);
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodecWithTags.class.getCanonicalName());
util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
util.startMiniCluster();
setupNamespace();
}
}