HBASE-18951 Use Builder pattern to remove nullable parameters for checkAndXXX methods in RawAsyncTable/AsyncTable interface

This commit is contained in:
zhangduo 2017-10-10 11:11:19 +08:00
parent 294f6b7860
commit d5b76547f0
4 changed files with 216 additions and 169 deletions

View File

@ -191,71 +191,74 @@ public interface AsyncTableBase {
} }
/** /**
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* adds the put. If the passed value is null, the check is for the lack of column (ie: * adds the Put/Delete/RowMutations.
* non-existence) * <p>
* @param row to check * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
* @param family column family to check * This is a fluent style API, the code is like:
* @param qualifier column qualifier to check *
* @param value the expected value * <pre>
* @param put data to put if check succeeds * <code>
* @return true if the new put was executed, false otherwise. The return value will be wrapped by * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
* a {@link CompletableFuture}. * .thenAccept(succ -> {
* if (succ) {
* System.out.println("Check and put succeeded");
* } else {
* System.out.println("Check and put failed");
* }
* });
* </code>
* </pre>
*/ */
default CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier, CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
byte[] value, Put put) {
return checkAndPut(row, family, qualifier, CompareOperator.EQUAL, value, put); /**
* A helper class for sending checkAndMutate request.
*/
interface CheckAndMutateBuilder {
/**
* @param qualifier column qualifier to check.
*/
CheckAndMutateBuilder qualifier(byte[] qualifier);
/**
* Check for lack of column.
*/
CheckAndMutateBuilder ifNotExists();
default CheckAndMutateBuilder ifEquals(byte[] value) {
return ifMatches(CompareOperator.EQUAL, value);
} }
/** /**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* adds the put. If the passed value is null, the check is for the lack of column (ie:
* non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param compareOp comparison operator to use * @param compareOp comparison operator to use
* @param value the expected value * @param value the expected value
*/
CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
/**
* @param put data to put if check succeeds * @param put data to put if check succeeds
* @return true if the new put was executed, false otherwise. The return value will be wrapped by * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
* a {@link CompletableFuture}. * will be wrapped by a {@link CompletableFuture}.
*/ */
CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompletableFuture<Boolean> thenPut(Put put);
CompareOperator compareOp, byte[] value, Put put);
/** /**
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
* non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param value the expected value
* @param delete data to delete if check succeeds * @param delete data to delete if check succeeds
* @return true if the new delete was executed, false otherwise. The return value will be wrapped * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
* by a {@link CompletableFuture}. * value will be wrapped by a {@link CompletableFuture}.
*/ */
default CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompletableFuture<Boolean> thenDelete(Delete delete);
byte[] value, Delete delete) {
return checkAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, delete); /**
* @param mutation mutations to perform if check succeeds
* @return true if the new mutation was executed, false otherwise. The return value will be
* wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
} }
/**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* adds the delete. If the passed value is null, the check is for the lack of column (ie:
* non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param compareOp comparison operator to use
* @param value the expected value
* @param delete data to delete if check succeeds
* @return true if the new delete was executed, false otherwise. The return value will be wrapped
* by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOperator compareOp, byte[] value, Delete delete);
/** /**
* Performs multiple mutations atomically on a single row. Currently {@link Put} and * Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported. * {@link Delete} are supported.
@ -264,39 +267,6 @@ public interface AsyncTableBase {
*/ */
CompletableFuture<Void> mutateRow(RowMutations mutation); CompletableFuture<Void> mutateRow(RowMutations mutation);
/**
* Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it
* performs the row mutations. If the passed value is null, the check is for the lack of column
* (ie: non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param value the expected value
* @param mutation mutations to perform if check succeeds
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
* a {@link CompletableFuture}.
*/
default CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
byte[] value, RowMutations mutation) {
return checkAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, mutation);
}
/**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* performs the row mutations. If the passed value is null, the check is for the lack of column
* (ie: non-existence)
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param compareOp the comparison operator
* @param value the expected value
* @param mutation mutations to perform if check succeeds
* @return true if the new put was executed, false otherwise. The return value will be wrapped by
* a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOperator compareOp, byte[] value, RowMutations mutation);
/** /**
* Return all the results that match the given scan object. * Return all the results that match the given scan object.
* <p> * <p>

View File

@ -17,18 +17,19 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.util.stream.Collectors.*;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* The implementation of AsyncTable. Based on {@link RawAsyncTable}. * The implementation of AsyncTable. Based on {@link RawAsyncTable}.
@ -121,15 +122,44 @@ class AsyncTableImpl implements AsyncTable {
} }
@Override @Override
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier, public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
CompareOperator op, byte[] value, Put put) { return new CheckAndMutateBuilder() {
return wrap(rawTable.checkAndPut(row, family, qualifier, op, value, put));
private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family);
@Override
public CompletableFuture<Boolean> thenPut(Put put) {
return wrap(builder.thenPut(put));
} }
@Override @Override
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier, public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
CompareOperator op, byte[] value, Delete delete) { return wrap(builder.thenMutate(mutation));
return wrap(rawTable.checkAndDelete(row, family, qualifier, op, value, delete)); }
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
return wrap(builder.thenDelete(delete));
}
@Override
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
builder.qualifier(qualifier);
return this;
}
@Override
public CheckAndMutateBuilder ifNotExists() {
builder.ifNotExists();
return this;
}
@Override
public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
builder.ifMatches(compareOp, value);
return this;
}
};
} }
@Override @Override
@ -137,12 +167,6 @@ class AsyncTableImpl implements AsyncTable {
return wrap(rawTable.mutateRow(mutation)); return wrap(rawTable.mutateRow(mutation));
} }
@Override
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, byte[] value, RowMutations mutation) {
return wrap(rawTable.checkAndMutate(row, family, qualifier, op, value, mutation));
}
@Override @Override
public CompletableFuture<List<Result>> scanAll(Scan scan) { public CompletableFuture<List<Result>> scanAll(Scan scan) {
return wrap(rawTable.scanAll(scan)); return wrap(rawTable.scanAll(scan));
@ -198,5 +222,4 @@ class AsyncTableImpl implements AsyncTable {
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
} }
} }

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import com.google.protobuf.RpcChannel;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@ -59,8 +62,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
import com.google.protobuf.RpcChannel;
/** /**
* The implementation of RawAsyncTable. * The implementation of RawAsyncTable.
*/ */
@ -134,7 +135,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
Converter<RESP, HBaseRpcController, PRESP> respConverter) { Converter<RESP, HBaseRpcController, PRESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>(); CompletableFuture<RESP> future = new CompletableFuture<>();
try { try {
rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req), rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req),
new RpcCallback<PRESP>() { new RpcCallback<PRESP>() {
@Override @Override
@ -251,12 +252,55 @@ class RawAsyncTableImpl implements RawAsyncTable {
.call(); .call();
} }
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
private final byte[] row;
private final byte[] family;
private byte[] qualifier;
private CompareOperator op;
private byte[] value;
public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
this.row = Preconditions.checkNotNull(row, "row is null");
this.family = Preconditions.checkNotNull(family, "family is null");
}
@Override @Override
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier, public CheckAndMutateBuilder qualifier(byte[] qualifier) {
CompareOperator op, byte[] value, Put put) { this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
return this.<Boolean> newCaller(row, rpcTimeoutNs) " an empty byte array, or just do not call this method if you want a null qualifier");
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc, return this;
stub, put, }
@Override
public CheckAndMutateBuilder ifNotExists() {
this.op = CompareOperator.EQUAL;
this.value = null;
return this;
}
@Override
public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
this.value = Preconditions.checkNotNull(value, "value is null");
return this;
}
private void preCheck() {
Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
" calling ifNotExists/ifEquals/ifMatches before executing the request");
}
@Override
public CompletableFuture<Boolean> thenPut(Put put) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), p), new BinaryComparator(value), CompareType.valueOf(op.name()), p),
(c, r) -> r.getProcessed())) (c, r) -> r.getProcessed()))
@ -264,9 +308,9 @@ class RawAsyncTableImpl implements RawAsyncTable {
} }
@Override @Override
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier, public CompletableFuture<Boolean> thenDelete(Delete delete) {
CompareOperator op, byte[] value, Delete delete) { preCheck();
return this.<Boolean> newCaller(row, rpcTimeoutNs) return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller, .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@ -275,6 +319,24 @@ class RawAsyncTableImpl implements RawAsyncTable {
.call(); .call();
} }
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
resp -> resp.getExists()))
.call();
}
}
@Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new CheckAndMutateBuilderImpl(row, family);
}
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method. // so here I write a new method as I do not want to change the abstraction of call method.
private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
@ -283,7 +345,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
Function<Result, RESP> respConverter) { Function<Result, RESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>(); CompletableFuture<RESP> future = new CompletableFuture<>();
try { try {
byte[] regionName = loc.getRegionInfo().getRegionName(); byte[] regionName = loc.getRegion().getRegionName();
MultiRequest req = reqConvert.convert(regionName, mutation); MultiRequest req = reqConvert.convert(regionName, mutation);
stub.multi(controller, req, new RpcCallback<MultiResponse>() { stub.multi(controller, req, new RpcCallback<MultiResponse>() {
@ -328,18 +390,6 @@ class RawAsyncTableImpl implements RawAsyncTable {
}, resp -> null)).call(); }, resp -> null)).call();
} }
@Override
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, byte[] value, RowMutations mutation) {
return this.<Boolean> newCaller(mutation, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
resp -> resp.getExists()))
.call();
}
private Scan setDefaultScanConfig(Scan scan) { private Scan setDefaultScanConfig(Scan scan) {
// always create a new scan object as we may reset the start row later. // always create a new scan object as we may reset the start row later.
Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
@ -488,7 +538,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
return; return;
} }
unfinishedRequest.incrementAndGet(); unfinishedRequest.incrementAndGet();
RegionInfo region = loc.getRegionInfo(); RegionInfo region = loc.getRegion();
if (locateFinished(region, endKey, endKeyInclusive)) { if (locateFinished(region, endKey, endKeyInclusive)) {
locateFinished.set(true); locateFinished.set(true);
} else { } else {
@ -524,4 +574,5 @@ class RawAsyncTableImpl implements RawAsyncTable {
(loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey, (loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey,
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
} }
} }

View File

@ -221,8 +221,9 @@ public class TestAsyncTable {
AtomicInteger successIndex = new AtomicInteger(-1); AtomicInteger successIndex = new AtomicInteger(-1);
int count = 10; int count = 10;
CountDownLatch latch = new CountDownLatch(count); CountDownLatch latch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> table.checkAndPut(row, FAMILY, QUALIFIER, null, IntStream.range(0, count)
new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
.thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
if (x) { if (x) {
successCount.incrementAndGet(); successCount.incrementAndGet();
successIndex.set(i); successIndex.set(i);
@ -249,8 +250,9 @@ public class TestAsyncTable {
AtomicInteger successCount = new AtomicInteger(0); AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1); AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch deleteLatch = new CountDownLatch(count); CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> table IntStream.range(0, count)
.checkAndDelete(row, FAMILY, QUALIFIER, VALUE, .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
.thenDelete(
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
.thenAccept(x -> { .thenAccept(x -> {
if (x) { if (x) {
@ -311,7 +313,8 @@ public class TestAsyncTable {
} catch (IOException e) { } catch (IOException e) {
throw new UncheckedIOException(e); throw new UncheckedIOException(e);
} }
table.checkAndMutate(row, FAMILY, QUALIFIER, VALUE, mutation).thenAccept(x -> { table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
.thenAccept(x -> {
if (x) { if (x) {
successCount.incrementAndGet(); successCount.incrementAndGet();
successIndex.set(i); successIndex.set(i);