HBASE-16837 Implement checkAndPut and checkAndDelete
This commit is contained in:
parent
ef8c65e542
commit
acc606571b
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
|
@ -204,4 +205,70 @@ public interface AsyncTable {
|
|||
new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
|
||||
.thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* adds the put. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param put data to put if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Put put) {
|
||||
return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* adds the put. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
* @param put data to put if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Put put);
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) {
|
||||
return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Delete delete);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
|
||||
|
||||
/**
|
||||
* The implementation of AsyncTable.
|
||||
|
@ -151,12 +154,16 @@ class AsyncTableImpl implements AsyncTable {
|
|||
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
|
||||
}
|
||||
|
||||
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
|
||||
return conn.callerFactory.<T> single().table(tableName).row(row.getRow())
|
||||
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
|
||||
return conn.callerFactory.<T> single().table(tableName).row(row)
|
||||
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
|
||||
return newCaller(row.getRow(), rpcTimeoutNs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Result> get(Get get) {
|
||||
return this.<Result> newCaller(get, readRpcTimeoutNs)
|
||||
|
@ -201,6 +208,30 @@ class AsyncTableImpl implements AsyncTable {
|
|||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Put put) {
|
||||
return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> AsyncTableImpl.<Put, Boolean> mutate(controller, loc,
|
||||
stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, Delete delete) {
|
||||
return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> AsyncTableImpl.<Delete, Boolean> mutate(controller, loc,
|
||||
stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
|
||||
this.readRpcTimeoutNs = unit.toNanos(timeout);
|
||||
|
@ -230,4 +261,5 @@ class AsyncTableImpl implements AsyncTable {
|
|||
public long getOperationTimeout(TimeUnit unit) {
|
||||
return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
}
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
|
@ -184,4 +185,61 @@ public class TestAsyncTable {
|
|||
.sorted().toArray();
|
||||
assertArrayEquals(IntStream.range(0, count).toArray(), actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndPut() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
AtomicInteger successCount = new AtomicInteger(0);
|
||||
AtomicInteger successIndex = new AtomicInteger(-1);
|
||||
int count = 10;
|
||||
CountDownLatch latch = new CountDownLatch(count);
|
||||
IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, QUALIFIER, null,
|
||||
new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
latch.countDown();
|
||||
}));
|
||||
latch.await();
|
||||
assertEquals(1, successCount.get());
|
||||
String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
|
||||
assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndDelete() throws InterruptedException, ExecutionException {
|
||||
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
|
||||
int count = 10;
|
||||
CountDownLatch putLatch = new CountDownLatch(count + 1);
|
||||
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
|
||||
IntStream.range(0, count)
|
||||
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
|
||||
.thenRun(() -> putLatch.countDown()));
|
||||
putLatch.await();
|
||||
|
||||
AtomicInteger successCount = new AtomicInteger(0);
|
||||
AtomicInteger successIndex = new AtomicInteger(-1);
|
||||
CountDownLatch deleteLatch = new CountDownLatch(count);
|
||||
IntStream.range(0, count).forEach(i -> table
|
||||
.checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
|
||||
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
|
||||
.thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
deleteLatch.countDown();
|
||||
}));
|
||||
deleteLatch.await();
|
||||
assertEquals(1, successCount.get());
|
||||
Result result = table.get(new Get(row)).get();
|
||||
IntStream.range(0, count).forEach(i -> {
|
||||
if (i == successIndex.get()) {
|
||||
assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
|
||||
} else {
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue