From ddc84356dd05b50352edbe75125a9a2c6d025689 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 19 Oct 2016 20:21:20 +0800 Subject: [PATCH] HBASE-16872 Implement mutateRow and checkAndMutate --- .../hadoop/hbase/client/AsyncTable.java | 41 +++++++++ .../hadoop/hbase/client/AsyncTableImpl.java | 74 ++++++++++++++++- .../hadoop/hbase/client/TestAsyncTable.java | 83 ++++++++++++++++--- 3 files changed, 187 insertions(+), 11 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 6019bdcfb48..46427463675 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -271,4 +271,45 @@ public interface AsyncTable { */ CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Delete delete); + + /** + * Performs multiple mutations atomically on a single row. Currently {@link Put} and + * {@link Delete} are supported. + * @param mutation object that specifies the set of mutations to perform atomically + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + CompletableFuture mutateRow(RowMutations mutation); + + /** + * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it + * performs the row mutations. If the passed value is null, the check is for the lack of column + * (ie: non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + default CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + byte[] value, RowMutations mutation) { + return checkAndMutate(row, family, qualifier, CompareOp.EQUAL, value, mutation); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * performs the row mutations. If the passed value is null, the check is for the lack of column + * (ie: non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp the comparison operator + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index b7dc38877c1..77a5bbef0ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; @@ -34,12 +35,17 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; +import org.apache.hadoop.hbase.util.Bytes; /** * The implementation of AsyncTable. @@ -232,6 +238,73 @@ class AsyncTableImpl implements AsyncTable { .call(); } + // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, + // so here I write a new method as I do not want to change the abstraction of call method. + private static CompletableFuture mutateRow(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, + Converter reqConvert, + Function respConverter) { + CompletableFuture future = new CompletableFuture<>(); + try { + byte[] regionName = loc.getRegionInfo().getRegionName(); + MultiRequest req = reqConvert.convert(regionName, mutation); + stub.multi(controller, req, new RpcCallback() { + + @Override + public void run(MultiResponse resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter + .getResults(req, resp, controller.cellScanner()); + Throwable ex = multiResp.getException(regionName); + if (ex != null) { + future + .completeExceptionally(ex instanceof IOException ? ex + : new IOException( + "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), + ex)); + } else { + future.complete(respConverter + .apply((Result) multiResp.getResults().get(regionName).result.get(0))); + } + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public CompletableFuture mutateRow(RowMutations mutation) { + return this. newCaller(mutation, writeRpcTimeoutNs).action((controller, loc, + stub) -> AsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> { + RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); + regionMutationBuilder.setAtomic(true); + return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + }, (resp) -> { + return null; + })).call(); + } + + @Override + public CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) { + return this. newCaller(mutation, writeRpcTimeoutNs) + .action((controller, loc, stub) -> AsyncTableImpl. mutateRow(controller, loc, stub, + mutation, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm), + (resp) -> resp.getExists())) + .call(); + } + @Override public void setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); @@ -261,5 +334,4 @@ class AsyncTableImpl implements AsyncTable { public long getOperationTimeout(TimeUnit unit) { return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); } - } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 8b3ab622d13..8ba34146224 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -221,16 +223,17 @@ public class TestAsyncTable { AtomicInteger successCount = new AtomicInteger(0); AtomicInteger successIndex = new AtomicInteger(-1); CountDownLatch deleteLatch = new CountDownLatch(count); - IntStream.range(0, count).forEach(i -> table - .checkAndDelete(row, FAMILY, QUALIFIER, VALUE, - new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) - .thenAccept(x -> { - if (x) { - successCount.incrementAndGet(); - successIndex.set(i); - } - deleteLatch.countDown(); - })); + IntStream.range(0, count) + .forEach(i -> table + .checkAndDelete(row, FAMILY, QUALIFIER, VALUE, + new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + deleteLatch.countDown(); + })); deleteLatch.await(); assertEquals(1, successCount.get()); Result result = table.get(new Get(row)).get(); @@ -242,4 +245,64 @@ public class TestAsyncTable { } }); } + + @Test + public void testMutateRow() throws InterruptedException, ExecutionException, IOException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + RowMutations mutation = new RowMutations(row); + mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); + table.mutateRow(mutation).get(); + Result result = table.get(new Get(row)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); + + mutation = new RowMutations(row); + mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); + mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); + table.mutateRow(mutation).get(); + result = table.get(new Get(row)).get(); + assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); + } + + @Test + public void testCheckAndMutate() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 10; + CountDownLatch putLatch = new CountDownLatch(count + 1); + table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); + IntStream.range(0, count) + .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) + .thenRun(() -> putLatch.countDown())); + putLatch.await(); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger successIndex = new AtomicInteger(-1); + CountDownLatch mutateLatch = new CountDownLatch(count); + IntStream.range(0, count).forEach(i -> { + RowMutations mutation = new RowMutations(row); + try { + mutation.add(new Delete(row).addColumn(FAMILY, QUALIFIER)); + mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + table.checkAndMutate(row, FAMILY, QUALIFIER, VALUE, mutation).thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + mutateLatch.countDown(); + }); + }); + mutateLatch.await(); + assertEquals(1, successCount.get()); + Result result = table.get(new Get(row)).get(); + IntStream.range(0, count).forEach(i -> { + if (i == successIndex.get()) { + assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); + } else { + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); + } + }); + } }