HBASE-19522 The complete order may be wrong in AsyncBufferedMutatorImpl

This commit is contained in:
Guanghao Zhang 2017-12-15 19:39:16 +08:00
parent 15a4aa0ce1
commit 8fa08b56f3
2 changed files with 18 additions and 6 deletions

View File

@ -74,12 +74,12 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
bufferedSize = 0L; bufferedSize = 0L;
Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator(); Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
for (CompletableFuture<?> future : table.batch(toSend)) { for (CompletableFuture<?> future : table.batch(toSend)) {
CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
future.whenComplete((r, e) -> { future.whenComplete((r, e) -> {
CompletableFuture<Void> f = toCompleteIter.next();
if (e != null) { if (e != null) {
f.completeExceptionally(e); toCompleteFuture.completeExceptionally(e);
} else { } else {
f.complete(null); toCompleteFuture.complete(null);
} }
}); });
} }

View File

@ -51,6 +51,8 @@ public class TestAsyncBufferMutator {
private static TableName TABLE_NAME = TableName.valueOf("async"); private static TableName TABLE_NAME = TableName.valueOf("async");
private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region");
private static byte[] CF = Bytes.toBytes("cf"); private static byte[] CF = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq"); private static byte[] CQ = Bytes.toBytes("cq");
@ -65,6 +67,7 @@ public class TestAsyncBufferMutator {
public static void setUp() throws Exception { public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, CF); TEST_UTIL.createTable(TABLE_NAME, CF);
TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
ThreadLocalRandom.current().nextBytes(VALUE); ThreadLocalRandom.current().nextBytes(VALUE);
} }
@ -76,10 +79,19 @@ public class TestAsyncBufferMutator {
} }
@Test @Test
public void test() throws InterruptedException { public void testWithMultiRegionTable() throws InterruptedException {
test(MULTI_REGION_TABLE_NAME);
}
@Test
public void testWithSingleRegionTable() throws InterruptedException {
test(TABLE_NAME);
}
private void test(TableName tableName) throws InterruptedException {
List<CompletableFuture<Void>> futures = new ArrayList<>(); List<CompletableFuture<Void>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator = try (AsyncBufferedMutator mutator =
CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferSize(16 * 1024).build()) { CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2) List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
.collect(Collectors.toList())); .collect(Collectors.toList()));
@ -96,7 +108,7 @@ public class TestAsyncBufferMutator {
} }
// mutator.close will call mutator.flush automatically so all tasks should have been done. // mutator.close will call mutator.flush automatically so all tasks should have been done.
futures.forEach(f -> f.join()); futures.forEach(f -> f.join());
AsyncTable<?> table = CONN.getTable(TABLE_NAME); AsyncTable<?> table = CONN.getTable(tableName);
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join()) IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
.forEach(r -> { .forEach(r -> {
assertArrayEquals(VALUE, r.getValue(CF, CQ)); assertArrayEquals(VALUE, r.getValue(CF, CQ));