HBASE-16872 Implement mutateRow and checkAndMutate

This commit is contained in:
zhangduo 2016-10-19 20:21:20 +08:00
parent ee1123b069
commit ddc84356dd
3 changed files with 187 additions and 11 deletions

View File

@ -271,4 +271,45 @@ public interface AsyncTable {
*/
CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete);
/**
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.
* @param mutation object that specifies the set of mutations to perform atomically
* @return A {@link CompletableFuture} that always returns null when complete normally.
*/
CompletableFuture<Void> mutateRow(RowMutations mutation);
/**
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
* performs the row mutations. If the passed value is null, the check is for the lack of column
* (ie: non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param value the expected value
* @param mutation mutations to perform if check succeeds
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
* a {@link CompletableFuture}.
*/
default CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
byte[] value, RowMutations mutation) {
return checkAndMutate(row, family, qualifier, CompareOp.EQUAL, value, mutation);
}
/**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* performs the row mutations. If the passed value is null, the check is for the lack of column
* (ie: non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param compareOp the comparison operator
* @param value the expected value
* @param mutation mutations to perform if check succeeds
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
* a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations mutation);
}

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
@ -34,12 +35,17 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes;
/**
* The implementation of AsyncTable.
@ -232,6 +238,73 @@ class AsyncTableImpl implements AsyncTable {
.call();
}
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method.
private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
Converter<MultiRequest, byte[], RowMutations> reqConvert,
Function<Result, RESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>();
try {
byte[] regionName = loc.getRegionInfo().getRegionName();
MultiRequest req = reqConvert.convert(regionName, mutation);
stub.multi(controller, req, new RpcCallback<MultiResponse>() {
@Override
public void run(MultiResponse resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter
.getResults(req, resp, controller.cellScanner());
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
future
.completeExceptionally(ex instanceof IOException ? ex
: new IOException(
"Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()),
ex));
} else {
future.complete(respConverter
.apply((Result) multiResp.getResults().get(regionName).result.get(0)));
}
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return this.<Void> newCaller(mutation, writeRpcTimeoutNs).action((controller, loc,
stub) -> AsyncTableImpl.<Void> mutateRow(controller, loc, stub, mutation, (rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
}, (resp) -> {
return null;
})).call();
}
@Override
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations mutation) {
return this.<Boolean> newCaller(mutation, writeRpcTimeoutNs)
.action((controller, loc, stub) -> AsyncTableImpl.<Boolean> mutateRow(controller, loc, stub,
mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm),
(resp) -> resp.getExists()))
.call();
}
@Override
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(timeout);
@ -261,5 +334,4 @@ class AsyncTableImpl implements AsyncTable {
public long getOperationTimeout(TimeUnit unit) {
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
}
}

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -221,7 +223,8 @@ public class TestAsyncTable {
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> table
IntStream.range(0, count)
.forEach(i -> table
.checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
.thenAccept(x -> {
@ -242,4 +245,64 @@ public class TestAsyncTable {
}
});
}
@Test
public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
RowMutations mutation = new RowMutations(row);
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
table.mutateRow(mutation).get();
Result result = table.get(new Get(row)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
mutation = new RowMutations(row);
mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
table.mutateRow(mutation).get();
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
}
@Test
public void testCheckAndMutate() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch mutateLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> {
RowMutations mutation = new RowMutations(row);
try {
mutation.add(new Delete(row).addColumn(FAMILY, QUALIFIER));
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
table.checkAndMutate(row, FAMILY, QUALIFIER, VALUE, mutation).thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
mutateLatch.countDown();
});
});
mutateLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
IntStream.range(0, count).forEach(i -> {
if (i == successIndex.get()) {
assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
} else {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
}
});
}
}