HBASE-22989 : null check for item2RegionMap while removing LoadQueueItem (#596)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
693f201f3d
commit
cb976b0306
|
@ -413,8 +413,10 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
try {
|
try {
|
||||||
Collection<LoadQueueItem> toRetry = future.get();
|
Collection<LoadQueueItem> toRetry = future.get();
|
||||||
|
|
||||||
for (LoadQueueItem lqi : toRetry) {
|
if (item2RegionMap != null) {
|
||||||
item2RegionMap.remove(lqi);
|
for (LoadQueueItem lqi : toRetry) {
|
||||||
|
item2RegionMap.remove(lqi);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// LQIs that are requeued to be regrouped.
|
// LQIs that are requeued to be regrouped.
|
||||||
queue.addAll(toRetry);
|
queue.addAll(toRetry);
|
||||||
|
|
|
@ -464,6 +464,55 @@ public class TestBulkLoadHFilesSplitRecovery {
|
||||||
loader.bulkLoad(tableName, dir);
|
loader.bulkLoad(tableName, dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We are testing a split after initial validation but before the atomic bulk load call.
|
||||||
|
* We cannot use presplitting to test this path, so we actually inject a
|
||||||
|
* split just before the atomic region load. However, we will pass null item2RegionMap
|
||||||
|
* and that should not affect the bulk load behavior.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSplitWhileBulkLoadPhaseWithoutItemMap() throws Exception {
|
||||||
|
final TableName table = TableName.valueOf(name.getMethodName());
|
||||||
|
setupTable(util.getConnection(), table, 10);
|
||||||
|
populateTable(util.getConnection(), table, 1);
|
||||||
|
assertExpectedTable(table, ROWCOUNT, 1);
|
||||||
|
|
||||||
|
// Now let's cause trouble. This will occur after checks and cause bulk
|
||||||
|
// files to fail when attempt to atomically import. This is recoverable.
|
||||||
|
final AtomicInteger attemptedCalls = new AtomicInteger();
|
||||||
|
BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void bulkLoadPhase(final AsyncClusterConnection conn, final TableName tableName,
|
||||||
|
final Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups,
|
||||||
|
final boolean copyFiles,
|
||||||
|
final Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
|
||||||
|
|
||||||
|
int i = attemptedCalls.incrementAndGet();
|
||||||
|
if (i == 1) {
|
||||||
|
// On first attempt force a split.
|
||||||
|
forceSplit(table);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Passing item2RegionMap null
|
||||||
|
// In the absence of LoadQueueItem, bulk load should work as expected
|
||||||
|
super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
// create HFiles for different column families
|
||||||
|
Path dir = buildBulkFiles(table, 2);
|
||||||
|
loader.bulkLoad(table, dir);
|
||||||
|
|
||||||
|
// check that data was loaded
|
||||||
|
// The three expected attempts are 1) failure because need to split, 2)
|
||||||
|
// load of split top 3) load of split bottom
|
||||||
|
assertEquals(3, attemptedCalls.get());
|
||||||
|
assertExpectedTable(table, ROWCOUNT, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks that all columns have the expected value and that there is the expected number of rows.
|
* Checks that all columns have the expected value and that there is the expected number of rows.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue