HBASE-18951 Use Builder pattern to remove nullable parameters for checkAndXXX methods in RawAsyncTable/AsyncTable interface
This commit is contained in:
parent
c3b3fd7888
commit
8597b19b3d
|
@ -190,72 +190,75 @@ public interface AsyncTableBase {
|
|||
.thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* adds the put. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param put data to put if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Put put) {
|
||||
return checkAndPut(row, family, qualifier, CompareOperator.EQUAL, value, put);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* adds the put. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
* @param put data to put if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
* adds the Put/Delete/RowMutations.
|
||||
* <p>
|
||||
* Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
|
||||
* This is a fluent style API, the code is like:
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
|
||||
* .thenAccept(succ -> {
|
||||
* if (succ) {
|
||||
* System.out.println("Check and put succeeded");
|
||||
* } else {
|
||||
* System.out.println("Check and put failed");
|
||||
* }
|
||||
* });
|
||||
* </code>
|
||||
* </pre>
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator compareOp, byte[] value, Put put);
|
||||
CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
* A helper class for sending checkAndMutate request.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, Delete delete) {
|
||||
return checkAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, delete);
|
||||
interface CheckAndMutateBuilder {
|
||||
|
||||
/**
|
||||
* @param qualifier column qualifier to check.
|
||||
*/
|
||||
CheckAndMutateBuilder qualifier(byte[] qualifier);
|
||||
|
||||
/**
|
||||
* Check for lack of column.
|
||||
*/
|
||||
CheckAndMutateBuilder ifNotExists();
|
||||
|
||||
default CheckAndMutateBuilder ifEquals(byte[] value) {
|
||||
return ifMatches(CompareOperator.EQUAL, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
*/
|
||||
CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
|
||||
|
||||
/**
|
||||
* @param put data to put if check succeeds
|
||||
* @return {@code true} if the new put was executed, {@code false} otherwise. The return value
|
||||
* will be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> thenPut(Put put);
|
||||
|
||||
/**
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return {@code true} if the new delete was executed, {@code false} otherwise. The return
|
||||
* value will be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> thenDelete(Delete delete);
|
||||
|
||||
/**
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new mutation was executed, false otherwise. The return value will be
|
||||
* wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
|
||||
* non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp comparison operator to use
|
||||
* @param value the expected value
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator compareOp, byte[] value, Delete delete);
|
||||
|
||||
/**
|
||||
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
|
||||
* {@link Delete} are supported.
|
||||
|
@ -264,39 +267,6 @@ public interface AsyncTableBase {
|
|||
*/
|
||||
CompletableFuture<Void> mutateRow(RowMutations mutation);
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
|
||||
* performs the row mutations. If the passed value is null, the check is for the lack of column
|
||||
* (ie: non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param value the expected value
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
default CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
byte[] value, RowMutations mutation) {
|
||||
return checkAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, mutation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
|
||||
* performs the row mutations. If the passed value is null, the check is for the lack of column
|
||||
* (ie: non-existence)
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp the comparison operator
|
||||
* @param value the expected value
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
|
||||
* a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator compareOp, byte[] value, RowMutations mutation);
|
||||
|
||||
/**
|
||||
* Return all the results that match the given scan object.
|
||||
* <p>
|
||||
|
|
|
@ -17,18 +17,19 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import static java.util.stream.Collectors.*;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The implementation of AsyncTable. Based on {@link RawAsyncTable}.
|
||||
|
@ -121,15 +122,44 @@ class AsyncTableImpl implements AsyncTable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, Put put) {
|
||||
return wrap(rawTable.checkAndPut(row, family, qualifier, op, value, put));
|
||||
}
|
||||
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
|
||||
return new CheckAndMutateBuilder() {
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, Delete delete) {
|
||||
return wrap(rawTable.checkAndDelete(row, family, qualifier, op, value, delete));
|
||||
private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family);
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenPut(Put put) {
|
||||
return wrap(builder.thenPut(put));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
return wrap(builder.thenMutate(mutation));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
return wrap(builder.thenDelete(delete));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
|
||||
builder.qualifier(qualifier);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateBuilder ifNotExists() {
|
||||
builder.ifNotExists();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
|
||||
builder.ifMatches(compareOp, value);
|
||||
return this;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -137,12 +167,6 @@ class AsyncTableImpl implements AsyncTable {
|
|||
return wrap(rawTable.mutateRow(mutation));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, RowMutations mutation) {
|
||||
return wrap(rawTable.checkAndMutate(row, family, qualifier, op, value, mutation));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<Result>> scanAll(Scan scan) {
|
||||
return wrap(rawTable.scanAll(scan));
|
||||
|
@ -198,5 +222,4 @@ class AsyncTableImpl implements AsyncTable {
|
|||
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
|
||||
return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
|
|||
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||
|
||||
import com.google.protobuf.RpcChannel;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
|
@ -59,8 +62,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
|
||||
|
||||
import com.google.protobuf.RpcChannel;
|
||||
|
||||
/**
|
||||
* The implementation of RawAsyncTable.
|
||||
*/
|
||||
|
@ -134,7 +135,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
Converter<RESP, HBaseRpcController, PRESP> respConverter) {
|
||||
CompletableFuture<RESP> future = new CompletableFuture<>();
|
||||
try {
|
||||
rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req),
|
||||
rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req),
|
||||
new RpcCallback<PRESP>() {
|
||||
|
||||
@Override
|
||||
|
@ -251,28 +252,89 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, Put put) {
|
||||
return this.<Boolean> newCaller(row, rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
|
||||
stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), p),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
|
||||
|
||||
private final byte[] row;
|
||||
|
||||
private final byte[] family;
|
||||
|
||||
private byte[] qualifier;
|
||||
|
||||
private CompareOperator op;
|
||||
|
||||
private byte[] value;
|
||||
|
||||
public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
|
||||
this.row = Preconditions.checkNotNull(row, "row is null");
|
||||
this.family = Preconditions.checkNotNull(family, "family is null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
|
||||
this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
|
||||
" an empty byte array, or just do not call this method if you want a null qualifier");
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateBuilder ifNotExists() {
|
||||
this.op = CompareOperator.EQUAL;
|
||||
this.value = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
|
||||
this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
|
||||
this.value = Preconditions.checkNotNull(value, "value is null");
|
||||
return this;
|
||||
}
|
||||
|
||||
private void preCheck() {
|
||||
Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
|
||||
" calling ifNotExists/ifEquals/ifMatches before executing the request");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenPut(Put put) {
|
||||
preCheck();
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
|
||||
loc, stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), p),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
preCheck();
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
|
||||
loc, stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), d),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
preCheck();
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
|
||||
stub, mutation,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
|
||||
resp -> resp.getExists()))
|
||||
.call();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, Delete delete) {
|
||||
return this.<Boolean> newCaller(row, rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
|
||||
loc, stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), d),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
|
||||
return new CheckAndMutateBuilderImpl(row, family);
|
||||
}
|
||||
|
||||
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
|
||||
|
@ -283,7 +345,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
Function<Result, RESP> respConverter) {
|
||||
CompletableFuture<RESP> future = new CompletableFuture<>();
|
||||
try {
|
||||
byte[] regionName = loc.getRegionInfo().getRegionName();
|
||||
byte[] regionName = loc.getRegion().getRegionName();
|
||||
MultiRequest req = reqConvert.convert(regionName, mutation);
|
||||
stub.multi(controller, req, new RpcCallback<MultiResponse>() {
|
||||
|
||||
|
@ -328,18 +390,6 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
}, resp -> null)).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, byte[] value, RowMutations mutation) {
|
||||
return this.<Boolean> newCaller(mutation, rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
|
||||
stub, mutation,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
|
||||
resp -> resp.getExists()))
|
||||
.call();
|
||||
}
|
||||
|
||||
private Scan setDefaultScanConfig(Scan scan) {
|
||||
// always create a new scan object as we may reset the start row later.
|
||||
Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
|
||||
|
@ -488,7 +538,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
return;
|
||||
}
|
||||
unfinishedRequest.incrementAndGet();
|
||||
RegionInfo region = loc.getRegionInfo();
|
||||
RegionInfo region = loc.getRegion();
|
||||
if (locateFinished(region, endKey, endKeyInclusive)) {
|
||||
locateFinished.set(true);
|
||||
} else {
|
||||
|
@ -524,4 +574,5 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
(loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey,
|
||||
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -221,14 +221,15 @@ public class TestAsyncTable {
|
|||
AtomicInteger successIndex = new AtomicInteger(-1);
|
||||
int count = 10;
|
||||
CountDownLatch latch = new CountDownLatch(count);
|
||||
IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, QUALIFIER, null,
|
||||
new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
latch.countDown();
|
||||
}));
|
||||
IntStream.range(0, count)
|
||||
.forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
|
||||
.thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
latch.countDown();
|
||||
}));
|
||||
latch.await();
|
||||
assertEquals(1, successCount.get());
|
||||
String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
|
||||
|
@ -249,16 +250,17 @@ public class TestAsyncTable {
|
|||
AtomicInteger successCount = new AtomicInteger(0);
|
||||
AtomicInteger successIndex = new AtomicInteger(-1);
|
||||
CountDownLatch deleteLatch = new CountDownLatch(count);
|
||||
IntStream.range(0, count).forEach(i -> table
|
||||
.checkAndDelete(row, FAMILY, QUALIFIER, VALUE,
|
||||
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
|
||||
.thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
deleteLatch.countDown();
|
||||
}));
|
||||
IntStream.range(0, count)
|
||||
.forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
|
||||
.thenDelete(
|
||||
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
|
||||
.thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
deleteLatch.countDown();
|
||||
}));
|
||||
deleteLatch.await();
|
||||
assertEquals(1, successCount.get());
|
||||
Result result = table.get(new Get(row)).get();
|
||||
|
@ -311,13 +313,14 @@ public class TestAsyncTable {
|
|||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
table.checkAndMutate(row, FAMILY, QUALIFIER, VALUE, mutation).thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
mutateLatch.countDown();
|
||||
});
|
||||
table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
|
||||
.thenAccept(x -> {
|
||||
if (x) {
|
||||
successCount.incrementAndGet();
|
||||
successIndex.set(i);
|
||||
}
|
||||
mutateLatch.countDown();
|
||||
});
|
||||
});
|
||||
mutateLatch.await();
|
||||
assertEquals(1, successCount.get());
|
||||
|
|
Loading…
Reference in New Issue