diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index ac159b4d986..5a92acef8d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -74,12 +74,12 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { bufferedSize = 0L; Iterator> toCompleteIter = toComplete.iterator(); for (CompletableFuture future : table.batch(toSend)) { + CompletableFuture toCompleteFuture = toCompleteIter.next(); future.whenComplete((r, e) -> { - CompletableFuture f = toCompleteIter.next(); if (e != null) { - f.completeExceptionally(e); + toCompleteFuture.completeExceptionally(e); } else { - f.complete(null); + toCompleteFuture.complete(null); } }); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java index 54937724481..6a5a00e7947 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java @@ -51,6 +51,8 @@ public class TestAsyncBufferMutator { private static TableName TABLE_NAME = TableName.valueOf("async"); + private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region"); + private static byte[] CF = Bytes.toBytes("cf"); private static byte[] CQ = Bytes.toBytes("cq"); @@ -65,6 +67,7 @@ public class TestAsyncBufferMutator { public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(TABLE_NAME, CF); + TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); ThreadLocalRandom.current().nextBytes(VALUE); } @@ -76,10 +79,19 @@ public class TestAsyncBufferMutator { } @Test - public void test() throws InterruptedException { + public void testWithMultiRegionTable() throws InterruptedException { + test(MULTI_REGION_TABLE_NAME); + } + + @Test + public void testWithSingleRegionTable() throws InterruptedException { + test(TABLE_NAME); + } + + private void test(TableName tableName) throws InterruptedException { List> futures = new ArrayList<>(); try (AsyncBufferedMutator mutator = - CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferSize(16 * 1024).build()) { + CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) { List> fs = mutator.mutate(IntStream.range(0, COUNT / 2) .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) .collect(Collectors.toList())); @@ -96,7 +108,7 @@ public class TestAsyncBufferMutator { } // mutator.close will call mutator.flush automatically so all tasks should have been done. futures.forEach(f -> f.join()); - AsyncTable table = CONN.getTable(TABLE_NAME); + AsyncTable table = CONN.getTable(tableName); IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join()) .forEach(r -> { assertArrayEquals(VALUE, r.getValue(CF, CQ));