HBASE-25281 Bulkload split hfile too many times due to unreasonable split point (#2692)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
d523d758af
commit
8e3727ea06
|
@ -722,6 +722,45 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
return lqis;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param startEndKeys the start/end keys of regions belong to this table, the list in ascending
|
||||
* order by start key
|
||||
* @param key the key need to find which region belong to
|
||||
* @return region index
|
||||
*/
|
||||
private int getRegionIndex(final Pair<byte[][], byte[][]> startEndKeys, byte[] key) {
|
||||
int idx = Arrays.binarySearch(startEndKeys.getFirst(), key, Bytes.BYTES_COMPARATOR);
|
||||
if (idx < 0) {
|
||||
// not on boundary, returns -(insertion index). Calculate region it
|
||||
// would be in.
|
||||
idx = -(idx + 1) - 1;
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
|
||||
/**
|
||||
* we can consider there is a region hole in following conditions. 1) if idx < 0,then first
|
||||
* region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
|
||||
* region. 3) if the endkey of the last region is not empty.
|
||||
*/
|
||||
private void checkRegionIndexValid(int idx, final Pair<byte[][], byte[][]> startEndKeys,
|
||||
TableName tableName) throws IOException {
|
||||
if (idx < 0) {
|
||||
throw new IOException("The first region info for table " + tableName +
|
||||
" can't be found in hbase:meta.Please use hbck tool to fix it first.");
|
||||
} else if ((idx == startEndKeys.getFirst().length - 1) &&
|
||||
!Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY)) {
|
||||
throw new IOException("The last region info for table " + tableName +
|
||||
" can't be found in hbase:meta.Please use hbck tool to fix it first.");
|
||||
} else if (idx + 1 < startEndKeys.getFirst().length &&
|
||||
!(Bytes.compareTo(startEndKeys.getSecond()[idx],
|
||||
startEndKeys.getFirst()[idx + 1]) == 0)) {
|
||||
throw new IOException("The endkey of one region for table " + tableName +
|
||||
" is not equal to the startkey of the next region in hbase:meta." +
|
||||
"Please use hbck tool to fix it first.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to assign the given load queue item into its target region group. If the hfile boundary
|
||||
* no longer fits into a region, physically splits the hfile such that the new bottom half will
|
||||
|
@ -745,8 +784,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
return new Pair<>(null, hfilePath.getName());
|
||||
}
|
||||
|
||||
LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) +
|
||||
" last=" + last.map(Bytes::toStringBinary));
|
||||
LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary)
|
||||
+ " last=" + last.map(Bytes::toStringBinary));
|
||||
if (!first.isPresent() || !last.isPresent()) {
|
||||
assert !first.isPresent() && !last.isPresent();
|
||||
// TODO what if this is due to a bad HFile?
|
||||
|
@ -754,47 +793,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
return null;
|
||||
}
|
||||
if (Bytes.compareTo(first.get(), last.get()) > 0) {
|
||||
throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) +
|
||||
" > " + Bytes.toStringBinary(last.get()));
|
||||
}
|
||||
int idx = Arrays.binarySearch(startEndKeys.getFirst(), first.get(), Bytes.BYTES_COMPARATOR);
|
||||
if (idx < 0) {
|
||||
// not on boundary, returns -(insertion index). Calculate region it
|
||||
// would be in.
|
||||
idx = -(idx + 1) - 1;
|
||||
}
|
||||
int indexForCallable = idx;
|
||||
|
||||
/**
|
||||
* we can consider there is a region hole in following conditions. 1) if idx < 0,then first
|
||||
* region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
|
||||
* region. 3) if the endkey of the last region is not empty.
|
||||
*/
|
||||
if (indexForCallable < 0) {
|
||||
throw new IOException("The first region info for table " + table.getName() +
|
||||
" can't be found in hbase:meta.Please use hbck tool to fix it first.");
|
||||
} else if ((indexForCallable == startEndKeys.getFirst().length - 1) &&
|
||||
!Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
|
||||
throw new IOException("The last region info for table " + table.getName() +
|
||||
" can't be found in hbase:meta.Please use hbck tool to fix it first.");
|
||||
} else if (indexForCallable + 1 < startEndKeys.getFirst().length &&
|
||||
!(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
|
||||
startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
|
||||
throw new IOException("The endkey of one region for table " + table.getName() +
|
||||
" is not equal to the startkey of the next region in hbase:meta." +
|
||||
"Please use hbck tool to fix it first.");
|
||||
throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get())
|
||||
+ " > " + Bytes.toStringBinary(last.get()));
|
||||
}
|
||||
|
||||
boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.getSecond()[idx]) < 0 ||
|
||||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
|
||||
int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get());
|
||||
checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, table.getName());
|
||||
boolean lastKeyInRange =
|
||||
Bytes.compareTo(last.get(), startEndKeys.getSecond()[firstKeyRegionIdx]) < 0 || Bytes
|
||||
.equals(startEndKeys.getSecond()[firstKeyRegionIdx], HConstants.EMPTY_BYTE_ARRAY);
|
||||
if (!lastKeyInRange) {
|
||||
int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
|
||||
int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) >>> 1;
|
||||
// make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger
|
||||
// than hfile.endkey w/o this check
|
||||
if (splitIdx != firstKeyRegionIdx) {
|
||||
checkRegionIndexValid(splitIdx, startEndKeys, table.getName());
|
||||
}
|
||||
List<LoadQueueItem> lqis = splitStoreFile(item, table,
|
||||
startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]);
|
||||
startEndKeys.getFirst()[firstKeyRegionIdx], startEndKeys.getSecond()[splitIdx]);
|
||||
return new Pair<>(lqis, null);
|
||||
}
|
||||
|
||||
// group regions.
|
||||
regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
|
||||
regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[firstKeyRegionIdx]), item);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -470,6 +470,34 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCorrectSplitPoint() throws Exception {
|
||||
final TableName table = TableName.valueOf(name.getMethodName());
|
||||
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
|
||||
Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
|
||||
Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"),
|
||||
Bytes.toBytes("row_00000070") };
|
||||
setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS);
|
||||
|
||||
final AtomicInteger bulkloadRpcTimes = new AtomicInteger();
|
||||
BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
|
||||
|
||||
@Override
|
||||
protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool,
|
||||
Deque<LoadIncrementalHFiles.LoadQueueItem> queue,
|
||||
Multimap<ByteBuffer, LoadIncrementalHFiles.LoadQueueItem> regionGroups, boolean copyFile,
|
||||
Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
|
||||
bulkloadRpcTimes.addAndGet(1);
|
||||
super.bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
|
||||
}
|
||||
};
|
||||
|
||||
Path dir = buildBulkFiles(table, 1);
|
||||
loader.bulkLoad(table, dir);
|
||||
// before HBASE-25281 we need invoke bulkload rpc 8 times
|
||||
assertEquals(4, bulkloadRpcTimes.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test creates a table with many small regions. The bulk load files would be splitted
|
||||
* multiple times before all of them can be loaded successfully.
|
||||
|
|
Loading…
Reference in New Issue