HBASE-25281 Bulkload split hfile too many times due to unreasonable split point (#2667)
Signed-off-by: Guanghao Zhang <zghao@apache.org> Signed-off-by: Wellington Chevreuil <wellington.chevreuil@gmail.com>
This commit is contained in:
parent
c9156e7891
commit
a307d706c8
|
@ -615,6 +615,45 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
return lqis;
|
return lqis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param startEndKeys the start/end keys of regions belong to this table, the list in ascending
|
||||||
|
* order by start key
|
||||||
|
* @param key the key need to find which region belong to
|
||||||
|
* @return region index
|
||||||
|
*/
|
||||||
|
private int getRegionIndex(List<Pair<byte[], byte[]>> startEndKeys, byte[] key) {
|
||||||
|
int idx = Collections.binarySearch(startEndKeys, Pair.newPair(key, HConstants.EMPTY_END_ROW),
|
||||||
|
(p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst()));
|
||||||
|
if (idx < 0) {
|
||||||
|
// not on boundary, returns -(insertion index). Calculate region it
|
||||||
|
// would be in.
|
||||||
|
idx = -(idx + 1) - 1;
|
||||||
|
}
|
||||||
|
return idx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* we can consider there is a region hole or overlap in following conditions. 1) if idx < 0,then
|
||||||
|
* first region info is lost. 2) if the endkey of a region is not equal to the startkey of the
|
||||||
|
* next region. 3) if the endkey of the last region is not empty.
|
||||||
|
*/
|
||||||
|
private void checkRegionIndexValid(int idx, List<Pair<byte[], byte[]>> startEndKeys,
|
||||||
|
TableName tableName) throws IOException {
|
||||||
|
if (idx < 0) {
|
||||||
|
throw new IOException("The first region info for table " + tableName
|
||||||
|
+ " can't be found in hbase:meta.Please use hbck tool to fix it first.");
|
||||||
|
} else if ((idx == startEndKeys.size() - 1)
|
||||||
|
&& !Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY)) {
|
||||||
|
throw new IOException("The last region info for table " + tableName
|
||||||
|
+ " can't be found in hbase:meta.Please use hbck tool to fix it first.");
|
||||||
|
} else if (idx + 1 < startEndKeys.size() && !(Bytes.compareTo(startEndKeys.get(idx).getSecond(),
|
||||||
|
startEndKeys.get(idx + 1).getFirst()) == 0)) {
|
||||||
|
throw new IOException("The endkey of one region for table " + tableName
|
||||||
|
+ " is not equal to the startkey of the next region in hbase:meta."
|
||||||
|
+ "Please use hbck tool to fix it first.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Attempt to assign the given load queue item into its target region group. If the hfile boundary
|
* Attempt to assign the given load queue item into its target region group. If the hfile boundary
|
||||||
* no longer fits into a region, physically splits the hfile such that the new bottom half will
|
* no longer fits into a region, physically splits the hfile such that the new bottom half will
|
||||||
|
@ -647,51 +686,30 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (Bytes.compareTo(first.get(), last.get()) > 0) {
|
if (Bytes.compareTo(first.get(), last.get()) > 0) {
|
||||||
throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) +
|
throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get())
|
||||||
" > " + Bytes.toStringBinary(last.get()));
|
+ " > " + Bytes.toStringBinary(last.get()));
|
||||||
}
|
}
|
||||||
int idx =
|
int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get());
|
||||||
Collections.binarySearch(startEndKeys, Pair.newPair(first.get(), HConstants.EMPTY_END_ROW),
|
checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, tableName);
|
||||||
(p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst()));
|
boolean lastKeyInRange =
|
||||||
if (idx < 0) {
|
Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes
|
||||||
// not on boundary, returns -(insertion index). Calculate region it
|
.equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY);
|
||||||
// would be in.
|
|
||||||
idx = -(idx + 1) - 1;
|
|
||||||
}
|
|
||||||
int indexForCallable = idx;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* we can consider there is a region hole in following conditions. 1) if idx < 0,then first
|
|
||||||
* region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
|
|
||||||
* region. 3) if the endkey of the last region is not empty.
|
|
||||||
*/
|
|
||||||
if (indexForCallable < 0) {
|
|
||||||
throw new IOException("The first region info for table " + tableName +
|
|
||||||
" can't be found in hbase:meta.Please use hbck tool to fix it first.");
|
|
||||||
} else if ((indexForCallable == startEndKeys.size() - 1) &&
|
|
||||||
!Bytes.equals(startEndKeys.get(indexForCallable).getSecond(), HConstants.EMPTY_BYTE_ARRAY)) {
|
|
||||||
throw new IOException("The last region info for table " + tableName +
|
|
||||||
" can't be found in hbase:meta.Please use hbck tool to fix it first.");
|
|
||||||
} else if (indexForCallable + 1 < startEndKeys.size() &&
|
|
||||||
!(Bytes.compareTo(startEndKeys.get(indexForCallable).getSecond(),
|
|
||||||
startEndKeys.get(indexForCallable + 1).getFirst()) == 0)) {
|
|
||||||
throw new IOException("The endkey of one region for table " + tableName +
|
|
||||||
" is not equal to the startkey of the next region in hbase:meta." +
|
|
||||||
"Please use hbck tool to fix it first.");
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.get(idx).getSecond()) < 0 ||
|
|
||||||
Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY);
|
|
||||||
if (!lastKeyInRange) {
|
if (!lastKeyInRange) {
|
||||||
Pair<byte[], byte[]> startEndKey = startEndKeys.get(indexForCallable);
|
int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
|
||||||
List<LoadQueueItem> lqis =
|
int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2;
|
||||||
splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)),
|
// make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger
|
||||||
startEndKey.getSecond());
|
// than hfile.endkey w/o this check
|
||||||
|
if (splitIdx != firstKeyRegionIdx) {
|
||||||
|
checkRegionIndexValid(splitIdx, startEndKeys, tableName);
|
||||||
|
}
|
||||||
|
byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
|
||||||
|
List<LoadQueueItem> lqis = splitStoreFile(item,
|
||||||
|
FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
|
||||||
return new Pair<>(lqis, null);
|
return new Pair<>(lqis, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
// group regions.
|
// group regions.
|
||||||
regionGroups.put(ByteBuffer.wrap(startEndKeys.get(idx).getFirst()), item);
|
regionGroups.put(ByteBuffer.wrap(startEndKeys.get(firstKeyRegionIdx).getFirst()), item);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -407,6 +407,33 @@ public class TestBulkLoadHFilesSplitRecovery {
|
||||||
assertEquals(20, countedLqis.get());
|
assertEquals(20, countedLqis.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCorrectSplitPoint() throws Exception {
|
||||||
|
final TableName table = TableName.valueOf(name.getMethodName());
|
||||||
|
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
|
||||||
|
Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
|
||||||
|
Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"),
|
||||||
|
Bytes.toBytes("row_00000070") };
|
||||||
|
setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS);
|
||||||
|
|
||||||
|
final AtomicInteger bulkloadRpcTimes = new AtomicInteger();
|
||||||
|
BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
|
||||||
|
Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
|
||||||
|
boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
|
||||||
|
bulkloadRpcTimes.addAndGet(1);
|
||||||
|
super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Path dir = buildBulkFiles(table, 1);
|
||||||
|
loader.bulkLoad(table, dir);
|
||||||
|
// before HBASE-25281 we need invoke bulkload rpc 8 times
|
||||||
|
assertEquals(4, bulkloadRpcTimes.get());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test creates a table with many small regions. The bulk load files would be splitted
|
* This test creates a table with many small regions. The bulk load files would be splitted
|
||||||
* multiple times before all of them can be loaded successfully.
|
* multiple times before all of them can be loaded successfully.
|
||||||
|
|
Loading…
Reference in New Issue