HBASE-23146 Support CheckAndMutate with multiple conditions (#1209)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
92c7a4134d
commit
c3edceb6ae
|
@ -29,6 +29,7 @@ import java.util.function.Function;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -289,6 +290,60 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
|
|||
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row matches the specified filter. If it does, it adds the
|
||||
* Put/Delete/RowMutations.
|
||||
* <p>
|
||||
* Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
|
||||
* execute it. This is a fluent style API, the code is like:
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* table.checkAndMutate(row, filter).thenPut(put)
|
||||
* .thenAccept(succ -> {
|
||||
* if (succ) {
|
||||
* System.out.println("Check and put succeeded");
|
||||
* } else {
|
||||
* System.out.println("Check and put failed");
|
||||
* }
|
||||
* });
|
||||
* </code>
|
||||
* </pre>
|
||||
*/
|
||||
CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
|
||||
|
||||
/**
|
||||
* A helper class for sending checkAndMutate request with a filter.
|
||||
*/
|
||||
interface CheckAndMutateWithFilterBuilder {
|
||||
|
||||
/**
|
||||
* @param timeRange time range to check.
|
||||
*/
|
||||
CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
|
||||
|
||||
/**
|
||||
* @param put data to put if check succeeds
|
||||
* @return {@code true} if the new put was executed, {@code false} otherwise. The return value
|
||||
* will be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> thenPut(Put put);
|
||||
|
||||
/**
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return {@code true} if the new delete was executed, {@code false} otherwise. The return
|
||||
* value will be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> thenDelete(Delete delete);
|
||||
|
||||
/**
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new mutation was executed, false otherwise. The return value will be
|
||||
* wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
|
||||
* {@link Delete} are supported.
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.function.Function;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -173,6 +174,36 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
|
|||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
|
||||
return new CheckAndMutateWithFilterBuilder() {
|
||||
|
||||
private final CheckAndMutateWithFilterBuilder builder =
|
||||
rawTable.checkAndMutate(row, filter);
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
|
||||
builder.timeRange(timeRange);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenPut(Put put) {
|
||||
return wrap(builder.thenPut(put));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
return wrap(builder.thenDelete(delete));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
return wrap(builder.thenMutate(mutation));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
|
||||
return wrap(rawTable.mutateRow(mutation));
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
@ -39,7 +40,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
|
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
@ -676,14 +675,16 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final byte [] value, final Put put) throws IOException {
|
||||
return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
|
||||
return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
|
||||
put);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
|
||||
return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
|
||||
return doCheckAndPut(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||
null, put);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -692,21 +693,20 @@ public class HTable implements Table {
|
|||
final CompareOperator op, final byte [] value, final Put put) throws IOException {
|
||||
// The name of the operators in CompareOperator are intentionally those of the
|
||||
// operators in the filter's CompareOp enum.
|
||||
return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
|
||||
return doCheckAndPut(row, family, qualifier, op, value, null, null, put);
|
||||
}
|
||||
|
||||
private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final String opName, final byte[] value, final TimeRange timeRange, final Put put)
|
||||
throws IOException {
|
||||
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
|
||||
final Put put) throws IOException {
|
||||
ClientServiceCallable<Boolean> callable =
|
||||
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
|
||||
this.rpcControllerFactory.newController(), put.getPriority()) {
|
||||
@Override
|
||||
protected Boolean rpcCall() throws Exception {
|
||||
CompareType compareType = CompareType.valueOf(opName);
|
||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
||||
new BinaryComparator(value), compareType, timeRange, put);
|
||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
|
||||
filter, timeRange, put);
|
||||
MutateResponse response = doMutate(request);
|
||||
return Boolean.valueOf(response.getProcessed());
|
||||
}
|
||||
|
@ -719,37 +719,37 @@ public class HTable implements Table {
|
|||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final byte[] value, final Delete delete) throws IOException {
|
||||
return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
|
||||
delete);
|
||||
return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, null,
|
||||
null, delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
|
||||
return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
|
||||
return doCheckAndDelete(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||
null, delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
|
||||
return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
|
||||
return doCheckAndDelete(row, family, qualifier, op, value, null, null, delete);
|
||||
}
|
||||
|
||||
private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
|
||||
throws IOException {
|
||||
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
|
||||
final Delete delete) throws IOException {
|
||||
CancellableRegionServerCallable<SingleResponse> callable =
|
||||
new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
|
||||
this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
|
||||
new RetryingTimeTracker().start(), delete.getPriority()) {
|
||||
@Override
|
||||
protected SingleResponse rpcCall() throws Exception {
|
||||
CompareType compareType = CompareType.valueOf(opName);
|
||||
MutateRequest request = RequestConverter
|
||||
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
|
||||
qualifier, new BinaryComparator(value), compareType, timeRange, delete);
|
||||
qualifier, op, value, filter, timeRange, delete);
|
||||
MutateResponse response = doMutate(request);
|
||||
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
|
||||
}
|
||||
|
@ -776,8 +776,14 @@ public class HTable implements Table {
|
|||
return new CheckAndMutateBuilderImpl(row, family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
|
||||
return new CheckAndMutateWithFilterBuilderImpl(row, filter);
|
||||
}
|
||||
|
||||
private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm)
|
||||
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
|
||||
final RowMutations rm)
|
||||
throws IOException {
|
||||
CancellableRegionServerCallable<MultiResponse> callable =
|
||||
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
|
||||
|
@ -785,10 +791,9 @@ public class HTable implements Table {
|
|||
rm.getMaxPriority()) {
|
||||
@Override
|
||||
protected MultiResponse rpcCall() throws Exception {
|
||||
CompareType compareType = CompareType.valueOf(opName);
|
||||
MultiRequest request = RequestConverter
|
||||
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
||||
new BinaryComparator(value), compareType, timeRange, rm);
|
||||
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
|
||||
qualifier, op, value, filter, timeRange, rm);
|
||||
ClientProtos.MultiResponse response = doMulti(request);
|
||||
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
||||
if (res.hasException()) {
|
||||
|
@ -833,14 +838,43 @@ public class HTable implements Table {
|
|||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOp compareOp, final byte [] value, final RowMutations rm)
|
||||
throws IOException {
|
||||
return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
|
||||
return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
|
||||
null, rm);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
|
||||
return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
|
||||
return doCheckAndMutate(row, family, qualifier, op, value, null, null, rm);
|
||||
}
|
||||
|
||||
private CompareOperator toCompareOperator(CompareOp compareOp) {
|
||||
switch (compareOp) {
|
||||
case LESS:
|
||||
return CompareOperator.LESS;
|
||||
|
||||
case LESS_OR_EQUAL:
|
||||
return CompareOperator.LESS_OR_EQUAL;
|
||||
|
||||
case EQUAL:
|
||||
return CompareOperator.EQUAL;
|
||||
|
||||
case NOT_EQUAL:
|
||||
return CompareOperator.NOT_EQUAL;
|
||||
|
||||
case GREATER_OR_EQUAL:
|
||||
return CompareOperator.GREATER_OR_EQUAL;
|
||||
|
||||
case GREATER:
|
||||
return CompareOperator.GREATER;
|
||||
|
||||
case NO_OP:
|
||||
return CompareOperator.NO_OP;
|
||||
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1247,19 +1281,54 @@ public class HTable implements Table {
|
|||
public boolean thenPut(Put put) throws IOException {
|
||||
validatePut(put);
|
||||
preCheck();
|
||||
return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
|
||||
return doCheckAndPut(row, family, qualifier, op, value, null, timeRange, put);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenDelete(Delete delete) throws IOException {
|
||||
preCheck();
|
||||
return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete);
|
||||
return doCheckAndDelete(row, family, qualifier, op, value, null, timeRange, delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenMutate(RowMutations mutation) throws IOException {
|
||||
preCheck();
|
||||
return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation);
|
||||
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange,
|
||||
mutation);
|
||||
}
|
||||
}
|
||||
|
||||
private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder {
|
||||
|
||||
private final byte[] row;
|
||||
private final Filter filter;
|
||||
private TimeRange timeRange;
|
||||
|
||||
CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
|
||||
this.row = Preconditions.checkNotNull(row, "row is null");
|
||||
this.filter = Preconditions.checkNotNull(filter, "filter is null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
|
||||
this.timeRange = timeRange;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenPut(Put put) throws IOException {
|
||||
validatePut(put);
|
||||
return doCheckAndPut(row, null, null, null, null, filter, timeRange, put);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenDelete(Delete delete) throws IOException {
|
||||
return doCheckAndDelete(row, null, null, null, null, filter, timeRange, delete);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean thenMutate(RowMutations mutation) throws IOException {
|
||||
return doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRespo
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
|
||||
|
||||
/**
|
||||
* The implementation of RawAsyncTable.
|
||||
|
@ -358,10 +357,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||
preCheck();
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
|
||||
stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
|
||||
null, timeRange, p),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
@ -370,10 +369,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
preCheck();
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
|
||||
loc, stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
|
||||
null, timeRange, d),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
@ -381,12 +380,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
preCheck();
|
||||
return RawAsyncTableImpl.this
|
||||
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller,
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
|
||||
rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
|
||||
loc, stub, mutation,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
|
||||
null, timeRange, rm),
|
||||
resp -> resp.getExists()))
|
||||
.call();
|
||||
}
|
||||
|
@ -397,6 +396,68 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
return new CheckAndMutateBuilderImpl(row, family);
|
||||
}
|
||||
|
||||
|
||||
private final class CheckAndMutateWithFilterBuilderImpl
|
||||
implements CheckAndMutateWithFilterBuilder {
|
||||
|
||||
private final byte[] row;
|
||||
|
||||
private final Filter filter;
|
||||
|
||||
private TimeRange timeRange;
|
||||
|
||||
public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
|
||||
this.row = Preconditions.checkNotNull(row, "row is null");
|
||||
this.filter = Preconditions.checkNotNull(filter, "filter is null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
|
||||
this.timeRange = timeRange;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenPut(Put put) {
|
||||
validatePut(put, conn.connConf.getMaxKeyValueSize());
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
|
||||
stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
|
||||
filter, timeRange, p),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenDelete(Delete delete) {
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
|
||||
loc, stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
|
||||
filter, timeRange, d),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
|
||||
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
|
||||
rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
|
||||
loc, stub, mutation,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
|
||||
filter, timeRange, rm),
|
||||
resp -> resp.getExists()))
|
||||
.call();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
|
||||
return new CheckAndMutateWithFilterBuilderImpl(row, filter);
|
||||
}
|
||||
|
||||
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
|
||||
// so here I write a new method as I do not want to change the abstraction of call method.
|
||||
private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -541,6 +542,53 @@ public interface Table extends Closeable {
|
|||
* @return {@code true} if the new delete was executed, {@code false} otherwise.
|
||||
*/
|
||||
boolean thenDelete(Delete delete) throws IOException;
|
||||
|
||||
/**
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new mutation was executed, false otherwise.
|
||||
*/
|
||||
boolean thenMutate(RowMutations mutation) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row matches the specified filter. If it does, it adds the
|
||||
* Put/Delete/RowMutations.
|
||||
* <p>
|
||||
* Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
|
||||
* execute it. This is a fluent style API, the code is like:
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* table.checkAndMutate(row, filter).thenPut(put);
|
||||
* </code>
|
||||
* </pre>
|
||||
*/
|
||||
default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
|
||||
throw new NotImplementedException("Add an implementation!");
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper class for sending checkAndMutate request with a filter.
|
||||
*/
|
||||
interface CheckAndMutateWithFilterBuilder {
|
||||
|
||||
/**
|
||||
* @param timeRange timeRange to check
|
||||
*/
|
||||
CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
|
||||
|
||||
/**
|
||||
* @param put data to put if check succeeds
|
||||
* @return {@code true} if the new put was executed, {@code false} otherwise.
|
||||
*/
|
||||
boolean thenPut(Put put) throws IOException;
|
||||
|
||||
/**
|
||||
* @param delete data to delete if check succeeds
|
||||
* @return {@code true} if the new delete was executed, {@code false} otherwise.
|
||||
*/
|
||||
boolean thenDelete(Delete delete) throws IOException;
|
||||
|
||||
/**
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @return true if the new mutation was executed, false otherwise.
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
|
@ -56,7 +57,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
|
@ -236,71 +238,51 @@ public final class RequestConverter {
|
|||
/**
|
||||
* Create a protocol buffer MutateRequest for a conditioned put
|
||||
*
|
||||
* @param regionName
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param comparator
|
||||
* @param compareType
|
||||
* @param put
|
||||
* @return a mutate request
|
||||
* @throws IOException
|
||||
*/
|
||||
public static MutateRequest buildMutateRequest(
|
||||
final byte[] regionName, final byte[] row, final byte[] family,
|
||||
final byte [] qualifier, final ByteArrayComparable comparator,
|
||||
final CompareType compareType, TimeRange timeRange, final Put put) throws IOException {
|
||||
return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
|
||||
, put, MutationType.PUT);
|
||||
final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
|
||||
final TimeRange timeRange, final Put put) throws IOException {
|
||||
return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
|
||||
put, MutationType.PUT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer MutateRequest for a conditioned delete
|
||||
*
|
||||
* @param regionName
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param comparator
|
||||
* @param compareType
|
||||
* @param delete
|
||||
* @return a mutate request
|
||||
* @throws IOException
|
||||
*/
|
||||
public static MutateRequest buildMutateRequest(
|
||||
final byte[] regionName, final byte[] row, final byte[] family,
|
||||
final byte [] qualifier, final ByteArrayComparable comparator,
|
||||
final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException {
|
||||
return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
|
||||
, delete, MutationType.DELETE);
|
||||
final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
|
||||
final TimeRange timeRange, final Delete delete) throws IOException {
|
||||
return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
|
||||
delete, MutationType.DELETE);
|
||||
}
|
||||
|
||||
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
|
||||
final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator,
|
||||
final CompareType compareType, TimeRange timeRange, final Mutation mutation,
|
||||
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
|
||||
final Filter filter, final TimeRange timeRange, final Mutation mutation,
|
||||
final MutationType type) throws IOException {
|
||||
return MutateRequest.newBuilder()
|
||||
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
|
||||
.setMutation(ProtobufUtil.toMutation(type, mutation))
|
||||
.setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
|
||||
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer MutateRequest for conditioned row mutations
|
||||
*
|
||||
* @param regionName
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param comparator
|
||||
* @param compareType
|
||||
* @param rowMutations
|
||||
* @return a mutate request
|
||||
* @throws IOException
|
||||
*/
|
||||
public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
|
||||
final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange,
|
||||
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
|
||||
final RowMutations rowMutations) throws IOException {
|
||||
RegionAction.Builder builder =
|
||||
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
|
||||
|
@ -308,7 +290,7 @@ public final class RequestConverter {
|
|||
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
||||
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
|
||||
for (Mutation mutation: rowMutations.getMutations()) {
|
||||
MutationType mutateType = null;
|
||||
MutationType mutateType;
|
||||
if (mutation instanceof Put) {
|
||||
mutateType = MutationType.PUT;
|
||||
} else if (mutation instanceof Delete) {
|
||||
|
@ -324,7 +306,7 @@ public final class RequestConverter {
|
|||
builder.addAction(actionBuilder.build());
|
||||
}
|
||||
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
|
||||
.setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
|
||||
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -1080,25 +1062,26 @@ public final class RequestConverter {
|
|||
/**
|
||||
* Create a protocol buffer Condition
|
||||
*
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param comparator
|
||||
* @param compareType
|
||||
* @return a Condition
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Condition buildCondition(final byte[] row, final byte[] family,
|
||||
final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType,
|
||||
final TimeRange timeRange) {
|
||||
return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row))
|
||||
.setFamily(UnsafeByteOperations.unsafeWrap(family))
|
||||
.setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ?
|
||||
HConstants.EMPTY_BYTE_ARRAY : qualifier))
|
||||
.setComparator(ProtobufUtil.toComparator(comparator))
|
||||
.setCompareType(compareType)
|
||||
.setTimeRange(ProtobufUtil.toTimeRange(timeRange))
|
||||
.build();
|
||||
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
|
||||
final TimeRange timeRange) throws IOException {
|
||||
|
||||
Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row));
|
||||
|
||||
if (filter != null) {
|
||||
builder.setFilter(ProtobufUtil.toFilter(filter));
|
||||
} else {
|
||||
builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
|
||||
.setQualifier(UnsafeByteOperations.unsafeWrap(
|
||||
qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
|
||||
.setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
|
||||
.setCompareType(CompareType.valueOf(op.name()));
|
||||
}
|
||||
|
||||
return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -139,11 +139,12 @@ message GetResponse {
|
|||
*/
|
||||
message Condition {
|
||||
required bytes row = 1;
|
||||
required bytes family = 2;
|
||||
required bytes qualifier = 3;
|
||||
required CompareType compare_type = 4;
|
||||
required Comparator comparator = 5;
|
||||
optional bytes family = 2;
|
||||
optional bytes qualifier = 3;
|
||||
optional CompareType compare_type = 4;
|
||||
optional Comparator comparator = 5;
|
||||
optional TimeRange time_range = 6;
|
||||
optional Filter filter = 7;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -139,11 +139,12 @@ message GetResponse {
|
|||
*/
|
||||
message Condition {
|
||||
required bytes row = 1;
|
||||
required bytes family = 2;
|
||||
required bytes qualifier = 3;
|
||||
required CompareType compare_type = 4;
|
||||
required Comparator comparator = 5;
|
||||
optional bytes family = 2;
|
||||
optional bytes qualifier = 3;
|
||||
optional CompareType compare_type = 4;
|
||||
optional Comparator comparator = 5;
|
||||
optional TimeRange time_range = 6;
|
||||
optional Filter filter = 7;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -24,6 +24,19 @@ import com.google.protobuf.Message;
|
|||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
|
@ -33,12 +46,13 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
|
@ -65,19 +79,6 @@ import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
|
@ -791,6 +792,11 @@ public class RemoteHTable implements Table {
|
|||
return new CheckAndMutateBuilderImpl(row, family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
|
||||
throw new NotImplementedException("Implement later");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -511,9 +512,8 @@ public interface RegionObserver {
|
|||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param put data to put if check succeeds
|
||||
* @param result
|
||||
* @return the return value to return to client if bypassing default
|
||||
* processing
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
*/
|
||||
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
|
||||
|
@ -521,6 +521,26 @@ public interface RegionObserver {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndPut.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
*/
|
||||
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Put put, boolean result) throws IOException {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndPut but after acquiring rowlock.
|
||||
* <p>
|
||||
|
@ -540,9 +560,8 @@ public interface RegionObserver {
|
|||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param put data to put if check succeeds
|
||||
* @param result
|
||||
* @return the return value to return to client if bypassing default
|
||||
* processing
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
*/
|
||||
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
|
@ -550,6 +569,30 @@ public interface RegionObserver {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndPut but after acquiring rowlock.
|
||||
* <p>
|
||||
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
|
||||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||
* can lead to potential deadlock.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
*/
|
||||
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, Filter filter, Put put, boolean result) throws IOException {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after checkAndPut
|
||||
* <p>
|
||||
|
@ -571,6 +614,23 @@ public interface RegionObserver {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after checkAndPut
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @param result from the checkAndPut
|
||||
* @return the possibly transformed return value to return to client
|
||||
*/
|
||||
default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Put put, boolean result) throws IOException {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndDelete.
|
||||
* <p>
|
||||
|
@ -586,7 +646,7 @@ public interface RegionObserver {
|
|||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @param result
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
*/
|
||||
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
|
@ -595,6 +655,26 @@ public interface RegionObserver {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndDelete.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param row row to check
|
||||
* @param filter column family
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
*/
|
||||
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Delete delete, boolean result) throws IOException {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndDelete but after acquiring rowock.
|
||||
* <p>
|
||||
|
@ -614,7 +694,7 @@ public interface RegionObserver {
|
|||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @param result
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
*/
|
||||
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
|
@ -623,6 +703,30 @@ public interface RegionObserver {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndDelete but after acquiring rowock.
|
||||
* <p>
|
||||
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
|
||||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||
* can lead to potential deadlock.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
*/
|
||||
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after checkAndDelete
|
||||
* <p>
|
||||
|
@ -644,6 +748,23 @@ public interface RegionObserver {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after checkAndDelete
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @param result from the CheckAndDelete
|
||||
* @return the possibly transformed returned value to return to client
|
||||
*/
|
||||
default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Delete delete, boolean result) throws IOException {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before Append.
|
||||
* <p>
|
||||
|
|
|
@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
|||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterWrapper;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
|
@ -4176,13 +4177,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
|
||||
checkMutationType(mutation, row);
|
||||
return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation);
|
||||
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null,
|
||||
mutation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation)
|
||||
throws IOException {
|
||||
return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
|
||||
return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null);
|
||||
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm)
|
||||
throws IOException {
|
||||
return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4190,7 +4204,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* switches in the few places where there is deviation.
|
||||
*/
|
||||
private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange,
|
||||
CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
|
||||
RowMutations rowMutations, Mutation mutation)
|
||||
throws IOException {
|
||||
// Could do the below checks but seems wacky with two callers only. Just comment out for now.
|
||||
|
@ -4204,8 +4218,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
startRegionOperation();
|
||||
try {
|
||||
Get get = new Get(row);
|
||||
if (family != null) {
|
||||
checkFamily(family);
|
||||
get.addColumn(family, qualifier);
|
||||
}
|
||||
if (filter != null) {
|
||||
get.setFilter(filter);
|
||||
}
|
||||
if (timeRange != null) {
|
||||
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||
}
|
||||
|
@ -4217,11 +4236,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Call coprocessor.
|
||||
Boolean processed = null;
|
||||
if (mutation instanceof Put) {
|
||||
processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
|
||||
qualifier, op, comparator, (Put)mutation);
|
||||
if (filter != null) {
|
||||
processed = this.getCoprocessorHost()
|
||||
.preCheckAndPutAfterRowLock(row, filter, (Put) mutation);
|
||||
} else {
|
||||
processed = this.getCoprocessorHost()
|
||||
.preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator,
|
||||
(Put) mutation);
|
||||
}
|
||||
} else if (mutation instanceof Delete) {
|
||||
processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
|
||||
qualifier, op, comparator, (Delete)mutation);
|
||||
if (filter != null) {
|
||||
processed = this.getCoprocessorHost()
|
||||
.preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation);
|
||||
} else {
|
||||
processed = this.getCoprocessorHost()
|
||||
.preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator,
|
||||
(Delete) mutation);
|
||||
}
|
||||
}
|
||||
if (processed != null) {
|
||||
return processed;
|
||||
|
@ -4231,9 +4262,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Supposition is that now all changes are done under row locks, then when we go to read,
|
||||
// we'll get the latest on this row.
|
||||
List<Cell> result = get(get, false);
|
||||
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
|
||||
boolean matches = false;
|
||||
long cellTs = 0;
|
||||
if (filter != null) {
|
||||
if (!result.isEmpty()) {
|
||||
matches = true;
|
||||
cellTs = result.get(0).getTimestamp();
|
||||
}
|
||||
} else {
|
||||
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
|
||||
if (result.isEmpty() && valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
|
||||
|
@ -4245,6 +4282,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
|
||||
matches = matches(op, compareResult);
|
||||
}
|
||||
}
|
||||
|
||||
// If matches put the new put or delete the new delete
|
||||
if (matches) {
|
||||
// We have acquired the row lock already. If the system clock is NOT monotonically
|
||||
|
|
|
@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
|||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
|
@ -613,9 +614,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
||||
*/
|
||||
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
|
||||
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder,
|
||||
ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
|
||||
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
|
||||
RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
|
||||
throws IOException {
|
||||
int countOfCompleteMutation = 0;
|
||||
try {
|
||||
if (!region.getRegionInfo().isMetaRegion()) {
|
||||
|
@ -658,7 +660,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
builder.addResultOrException(
|
||||
resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
|
||||
if (filter != null) {
|
||||
return region.checkAndRowMutate(row, filter, timeRange, rm);
|
||||
} else {
|
||||
return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
|
||||
}
|
||||
} finally {
|
||||
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
|
||||
// even if the malformed cells are not skipped.
|
||||
|
@ -2728,18 +2735,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (request.hasCondition()) {
|
||||
Condition condition = request.getCondition();
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
byte[] family = condition.getFamily().toByteArray();
|
||||
byte[] qualifier = condition.getQualifier().toByteArray();
|
||||
CompareOperator op =
|
||||
CompareOperator.valueOf(condition.getCompareType().name());
|
||||
ByteArrayComparable comparator =
|
||||
ProtobufUtil.toComparator(condition.getComparator());
|
||||
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
|
||||
byte[] qualifier = condition.hasQualifier() ?
|
||||
condition.getQualifier().toByteArray() : null;
|
||||
CompareOperator op = condition.hasCompareType() ?
|
||||
CompareOperator.valueOf(condition.getCompareType().name()) : null;
|
||||
ByteArrayComparable comparator = condition.hasComparator() ?
|
||||
ProtobufUtil.toComparator(condition.getComparator()) : null;
|
||||
Filter filter = condition.hasFilter() ?
|
||||
ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||
TimeRange.allTime();
|
||||
processed =
|
||||
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
|
||||
qualifier, op, comparator, timeRange, regionActionResultBuilder,
|
||||
qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
|
||||
spaceQuotaEnforcement);
|
||||
} else {
|
||||
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
|
||||
|
@ -2878,24 +2888,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (request.hasCondition()) {
|
||||
Condition condition = request.getCondition();
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
byte[] family = condition.getFamily().toByteArray();
|
||||
byte[] qualifier = condition.getQualifier().toByteArray();
|
||||
CompareOperator compareOp =
|
||||
CompareOperator.valueOf(condition.getCompareType().name());
|
||||
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
|
||||
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
|
||||
byte[] qualifier = condition.hasQualifier() ?
|
||||
condition.getQualifier().toByteArray() : null;
|
||||
CompareOperator op = condition.hasCompareType() ?
|
||||
CompareOperator.valueOf(condition.getCompareType().name()) : null;
|
||||
ByteArrayComparable comparator = condition.hasComparator() ?
|
||||
ProtobufUtil.toComparator(condition.getComparator()) : null;
|
||||
Filter filter = condition.hasFilter() ?
|
||||
ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||
TimeRange.allTime();
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
|
||||
compareOp, comparator, put);
|
||||
if (filter != null) {
|
||||
processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
|
||||
} else {
|
||||
processed = region.getCoprocessorHost()
|
||||
.preCheckAndPut(row, family, qualifier, op, comparator, put);
|
||||
}
|
||||
}
|
||||
if (processed == null) {
|
||||
boolean result = region.checkAndMutate(row, family,
|
||||
qualifier, compareOp, comparator, timeRange, put);
|
||||
boolean result;
|
||||
if (filter != null) {
|
||||
result = region.checkAndMutate(row, filter, timeRange, put);
|
||||
} else {
|
||||
result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
|
||||
put);
|
||||
}
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndPut(row, family,
|
||||
qualifier, compareOp, comparator, put, result);
|
||||
if (filter != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
|
||||
} else {
|
||||
result = region.getCoprocessorHost()
|
||||
.postCheckAndPut(row, family, qualifier, op, comparator, put, result);
|
||||
}
|
||||
}
|
||||
processed = result;
|
||||
}
|
||||
|
@ -2912,23 +2939,42 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (request.hasCondition()) {
|
||||
Condition condition = request.getCondition();
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
byte[] family = condition.getFamily().toByteArray();
|
||||
byte[] qualifier = condition.getQualifier().toByteArray();
|
||||
CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
|
||||
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
|
||||
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
|
||||
byte[] qualifier = condition.hasQualifier() ?
|
||||
condition.getQualifier().toByteArray() : null;
|
||||
CompareOperator op = condition.hasCompareType() ?
|
||||
CompareOperator.valueOf(condition.getCompareType().name()) : null;
|
||||
ByteArrayComparable comparator = condition.hasComparator() ?
|
||||
ProtobufUtil.toComparator(condition.getComparator()) : null;
|
||||
Filter filter = condition.hasFilter() ?
|
||||
ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||
TimeRange.allTime();
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
|
||||
comparator, delete);
|
||||
if (filter != null) {
|
||||
processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
|
||||
} else {
|
||||
processed = region.getCoprocessorHost()
|
||||
.preCheckAndDelete(row, family, qualifier, op, comparator, delete);
|
||||
}
|
||||
}
|
||||
if (processed == null) {
|
||||
boolean result = region.checkAndMutate(row, family,
|
||||
qualifier, op, comparator, timeRange, delete);
|
||||
boolean result;
|
||||
if (filter != null) {
|
||||
result = region.checkAndMutate(row, filter, timeRange, delete);
|
||||
} else {
|
||||
result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
|
||||
delete);
|
||||
}
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndDelete(row, family,
|
||||
qualifier, op, comparator, delete, result);
|
||||
if (filter != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
|
||||
result);
|
||||
} else {
|
||||
result = region.getCoprocessorHost()
|
||||
.postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
|
||||
}
|
||||
}
|
||||
processed = result;
|
||||
}
|
||||
|
@ -2979,7 +3025,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
break;
|
||||
default:
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -330,6 +331,31 @@ public interface Region extends ConfigurationObserver {
|
|||
boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically checks if a row matches the filter and if it does, it performs the mutation. See
|
||||
* checkAndRowMutate to do many checkAndPuts at a time on a single row.
|
||||
* @param row to check
|
||||
* @param filter the filter
|
||||
* @param mutation data to put if check succeeds
|
||||
* @return true if mutation was applied, false otherwise
|
||||
*/
|
||||
default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation)
|
||||
throws IOException {
|
||||
return checkAndMutate(row, filter, TimeRange.allTime(), mutation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row value matches the filter and if it does, it performs the mutation.
|
||||
* See checkAndRowMutate to do many checkAndPuts at a time on a single row.
|
||||
* @param row to check
|
||||
* @param filter the filter
|
||||
* @param mutation data to put if check succeeds
|
||||
* @param timeRange time range to check
|
||||
* @return true if mutation was applied, false otherwise
|
||||
*/
|
||||
boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected values and if it does,
|
||||
* it performs the row mutations. If the passed value is null, the lack of column value
|
||||
|
@ -367,6 +393,33 @@ public interface Region extends ConfigurationObserver {
|
|||
ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically checks if a row matches the filter and if it does, it performs the row mutations.
|
||||
* Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
|
||||
* time.
|
||||
* @param row to check
|
||||
* @param filter the filter
|
||||
* @param mutations data to put if check succeeds
|
||||
* @return true if mutations were applied, false otherwise
|
||||
*/
|
||||
default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations)
|
||||
throws IOException {
|
||||
return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically checks if a row matches the filter and if it does, it performs the row mutations.
|
||||
* Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
|
||||
* time.
|
||||
* @param row to check
|
||||
* @param filter the filter
|
||||
* @param mutations data to put if check succeeds
|
||||
* @param timeRange time range to check
|
||||
* @return true if mutations were applied, false otherwise
|
||||
*/
|
||||
boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange,
|
||||
RowMutations mutations) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes the specified cells/row.
|
||||
* @param delete
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -1079,6 +1080,31 @@ public class RegionCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
*/
|
||||
public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put)
|
||||
throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndPut(this, row, filter, put, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
|
@ -1111,6 +1137,33 @@ public class RegionCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
|
||||
justification="Null is legit")
|
||||
public Boolean preCheckAndPutAfterRowLock(
|
||||
final byte[] row, final Filter filter, final Put put) throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
|
@ -1137,6 +1190,26 @@ public class RegionCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put,
|
||||
boolean result) throws IOException {
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return result;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.postCheckAndPut(this, row, filter, put, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
|
@ -1145,8 +1218,8 @@ public class RegionCoprocessorHost
|
|||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed,
|
||||
* or null otherwise
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
*/
|
||||
public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final CompareOperator op,
|
||||
|
@ -1168,6 +1241,31 @@ public class RegionCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
*/
|
||||
public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete)
|
||||
throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndDelete(this, row, filter, delete, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
|
@ -1200,6 +1298,33 @@ public class RegionCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed,
|
||||
* or null otherwise
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
|
||||
justification="Null is legit")
|
||||
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter,
|
||||
final Delete delete) throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
|
@ -1226,6 +1351,26 @@ public class RegionCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete,
|
||||
boolean result) throws IOException {
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return result;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.postCheckAndDelete(this, row, filter, delete, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param append append object
|
||||
|
|
|
@ -30,6 +30,7 @@ import static org.junit.Assert.fail;
|
|||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -41,10 +42,17 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.function.Supplier;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.FamilyFilter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.filter.QualifierFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.filter.TimestampsFilter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -401,6 +409,201 @@ public class TestAsyncTable {
|
|||
assertTrue(ok);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithSingleFilter() throws Throwable {
|
||||
AsyncTable<?> table = getTable.get();
|
||||
|
||||
// Put one row
|
||||
Put put = new Put(row);
|
||||
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
|
||||
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
|
||||
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
|
||||
table.put(put).get();
|
||||
|
||||
// Put with success
|
||||
boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("a")))
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
|
||||
.get();
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
// Put with failure
|
||||
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("b")))
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
|
||||
.get();
|
||||
assertFalse(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
|
||||
|
||||
// Delete with success
|
||||
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("a")))
|
||||
.thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
|
||||
.get();
|
||||
assertTrue(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
|
||||
|
||||
// Mutate with success
|
||||
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("b")))
|
||||
.thenMutate(new RowMutations(row)
|
||||
.add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
|
||||
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
|
||||
.get();
|
||||
assertTrue(ok);
|
||||
|
||||
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
|
||||
AsyncTable<?> table = getTable.get();
|
||||
|
||||
// Put one row
|
||||
Put put = new Put(row);
|
||||
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
|
||||
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
|
||||
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
|
||||
table.put(put).get();
|
||||
|
||||
// Put with success
|
||||
boolean ok = table.checkAndMutate(row, new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
))
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
|
||||
.get();
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
// Put with failure
|
||||
ok = table.checkAndMutate(row, new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("c"))
|
||||
))
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
|
||||
.get();
|
||||
assertFalse(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
|
||||
|
||||
// Delete with success
|
||||
ok = table.checkAndMutate(row, new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
))
|
||||
.thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
|
||||
.get();
|
||||
assertTrue(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
|
||||
|
||||
// Mutate with success
|
||||
ok = table.checkAndMutate(row, new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
))
|
||||
.thenMutate(new RowMutations(row)
|
||||
.add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
|
||||
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
|
||||
.get();
|
||||
assertTrue(ok);
|
||||
|
||||
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
|
||||
AsyncTable<?> table = getTable.get();
|
||||
|
||||
// Put with specifying the timestamp
|
||||
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
|
||||
|
||||
// Put with success
|
||||
boolean ok = table.checkAndMutate(row, new FilterList(
|
||||
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
|
||||
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
|
||||
new TimestampsFilter(Collections.singletonList(100L))
|
||||
))
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
|
||||
.get();
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
|
||||
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
|
||||
|
||||
// Put with failure
|
||||
ok = table.checkAndMutate(row, new FilterList(
|
||||
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
|
||||
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
|
||||
new TimestampsFilter(Collections.singletonList(101L))
|
||||
))
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
|
||||
.get();
|
||||
assertFalse(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
|
||||
AsyncTable<?> table = getTable.get();
|
||||
|
||||
// Put with specifying the timestamp
|
||||
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
|
||||
.get();
|
||||
|
||||
// Put with success
|
||||
boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("a")))
|
||||
.timeRange(TimeRange.between(0, 101))
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
|
||||
.get();
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
|
||||
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
|
||||
|
||||
// Put with failure
|
||||
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("a")))
|
||||
.timeRange(TimeRange.between(0, 100))
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
|
||||
.get();
|
||||
assertFalse(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
|
||||
getTable.get().checkAndMutate(row, FAMILY)
|
||||
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisabled() throws InterruptedException, ExecutionException {
|
||||
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
|
||||
|
|
|
@ -17,13 +17,24 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.FamilyFilter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.filter.QualifierFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.filter.TimestampsFilter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -184,4 +195,183 @@ public class TestCheckAndMutate {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithSingleFilter() throws Throwable {
|
||||
try (Table table = createTable()) {
|
||||
// put one row
|
||||
putOneRow(table);
|
||||
// get row back and assert the values
|
||||
getOneRowAndAssertAllExist(table);
|
||||
|
||||
// Put with success
|
||||
boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY,
|
||||
Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
// Put with failure
|
||||
ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("b")))
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
|
||||
assertFalse(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
|
||||
|
||||
// Delete with success
|
||||
ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("a")))
|
||||
.thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")));
|
||||
assertTrue(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
// Mutate with success
|
||||
ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("b")))
|
||||
.thenMutate(new RowMutations(ROWKEY)
|
||||
.add((Mutation) new Put(ROWKEY)
|
||||
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
|
||||
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
|
||||
assertTrue(ok);
|
||||
|
||||
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
|
||||
try (Table table = createTable()) {
|
||||
// put one row
|
||||
putOneRow(table);
|
||||
// get row back and assert the values
|
||||
getOneRowAndAssertAllExist(table);
|
||||
|
||||
// Put with success
|
||||
boolean ok = table.checkAndMutate(ROWKEY, new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
))
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
// Put with failure
|
||||
ok = table.checkAndMutate(ROWKEY, new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("c"))
|
||||
))
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
|
||||
assertFalse(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
|
||||
|
||||
// Delete with success
|
||||
ok = table.checkAndMutate(ROWKEY, new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
))
|
||||
.thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")));
|
||||
assertTrue(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
// Mutate with success
|
||||
ok = table.checkAndMutate(ROWKEY, new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
))
|
||||
.thenMutate(new RowMutations(ROWKEY)
|
||||
.add((Mutation) new Put(ROWKEY)
|
||||
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
|
||||
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
|
||||
assertTrue(ok);
|
||||
|
||||
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
|
||||
try (Table table = createTable()) {
|
||||
// Put with specifying the timestamp
|
||||
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
|
||||
|
||||
// Put with success
|
||||
boolean ok = table.checkAndMutate(ROWKEY, new FilterList(
|
||||
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
|
||||
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
|
||||
new TimestampsFilter(Collections.singletonList(100L))
|
||||
))
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
|
||||
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
|
||||
|
||||
// Put with failure
|
||||
ok = table.checkAndMutate(ROWKEY, new FilterList(
|
||||
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
|
||||
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
|
||||
new TimestampsFilter(Collections.singletonList(101L))
|
||||
))
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
|
||||
assertFalse(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
|
||||
try (Table table = createTable()) {
|
||||
// Put with specifying the timestamp
|
||||
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
|
||||
|
||||
// Put with success
|
||||
boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY,
|
||||
Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
|
||||
.timeRange(TimeRange.between(0, 101))
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
|
||||
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
|
||||
|
||||
// Put with failure
|
||||
ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
|
||||
CompareOperator.EQUAL, Bytes.toBytes("a")))
|
||||
.timeRange(TimeRange.between(0, 100))
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
|
||||
assertFalse(ok);
|
||||
|
||||
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
|
||||
try (Table table = createTable()) {
|
||||
table.checkAndMutate(ROWKEY, FAMILY)
|
||||
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,12 +30,12 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
|
@ -238,8 +238,7 @@ public class TestMalformedCellFromClient {
|
|||
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
||||
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
|
||||
ClientProtos.Condition condition = RequestConverter
|
||||
.buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
|
||||
HBaseProtos.CompareType.EQUAL, null);
|
||||
.buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null);
|
||||
for (Mutation mutation : rm.getMutations()) {
|
||||
ClientProtos.MutationProto.MutationType mutateType = null;
|
||||
if (mutation instanceof Put) {
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -99,11 +100,17 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
final AtomicInteger ctPostIncrement = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostAppend = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndPut = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndPutWithFilter = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndPutAfterRowLock = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndPutWithFilterAfterRowLock = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostCheckAndPut = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostCheckAndPutWithFilter = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndDeleteWithFilter = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0);
|
||||
|
@ -492,6 +499,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Put put, boolean result) throws IOException {
|
||||
ctPreCheckAndPutWithFilter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
|
||||
|
@ -500,6 +514,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, Filter filter, Put put, boolean result) throws IOException {
|
||||
ctPreCheckAndPutWithFilterAfterRowLock.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
|
||||
|
@ -508,6 +529,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Put put, boolean result) throws IOException {
|
||||
ctPostCheckAndPutWithFilter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
|
||||
|
@ -516,6 +544,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Delete delete, boolean result) throws IOException {
|
||||
ctPreCheckAndDeleteWithFilter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
|
||||
|
@ -524,6 +559,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
|
||||
ctPreCheckAndDeleteWithFilterAfterRowLock.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
|
||||
|
@ -532,6 +574,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
|
||||
Filter filter, Delete delete, boolean result) throws IOException {
|
||||
ctPostCheckAndDeleteWithFilter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
Append append) throws IOException {
|
||||
|
@ -693,28 +742,52 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return ctPostCloseRegionOperation.get();
|
||||
}
|
||||
|
||||
public boolean hadPreCheckAndPut() {
|
||||
return ctPreCheckAndPut.get() > 0;
|
||||
public int getPreCheckAndPut() {
|
||||
return ctPreCheckAndPut.get();
|
||||
}
|
||||
|
||||
public boolean hadPreCheckAndPutAfterRowLock() {
|
||||
return ctPreCheckAndPutAfterRowLock.get() > 0;
|
||||
public int getPreCheckAndPutWithFilter() {
|
||||
return ctPreCheckAndPutWithFilter.get();
|
||||
}
|
||||
|
||||
public boolean hadPostCheckAndPut() {
|
||||
return ctPostCheckAndPut.get() > 0;
|
||||
public int getPreCheckAndPutAfterRowLock() {
|
||||
return ctPreCheckAndPutAfterRowLock.get();
|
||||
}
|
||||
|
||||
public boolean hadPreCheckAndDelete() {
|
||||
return ctPreCheckAndDelete.get() > 0;
|
||||
public int getPreCheckAndPutWithFilterAfterRowLock() {
|
||||
return ctPreCheckAndPutWithFilterAfterRowLock.get();
|
||||
}
|
||||
|
||||
public boolean hadPreCheckAndDeleteAfterRowLock() {
|
||||
return ctPreCheckAndDeleteAfterRowLock.get() > 0;
|
||||
public int getPostCheckAndPut() {
|
||||
return ctPostCheckAndPut.get();
|
||||
}
|
||||
|
||||
public boolean hadPostCheckAndDelete() {
|
||||
return ctPostCheckAndDelete.get() > 0;
|
||||
public int getPostCheckAndPutWithFilter() {
|
||||
return ctPostCheckAndPutWithFilter.get();
|
||||
}
|
||||
|
||||
public int getPreCheckAndDelete() {
|
||||
return ctPreCheckAndDelete.get();
|
||||
}
|
||||
|
||||
public int getPreCheckAndDeleteWithFilter() {
|
||||
return ctPreCheckAndDeleteWithFilter.get();
|
||||
}
|
||||
|
||||
public int getPreCheckAndDeleteAfterRowLock() {
|
||||
return ctPreCheckAndDeleteAfterRowLock.get();
|
||||
}
|
||||
|
||||
public int getPreCheckAndDeleteWithFilterAfterRowLock() {
|
||||
return ctPreCheckAndDeleteWithFilterAfterRowLock.get();
|
||||
}
|
||||
|
||||
public int getPostCheckAndDelete() {
|
||||
return ctPostCheckAndDelete.get();
|
||||
}
|
||||
|
||||
public int getPostCheckAndDeleteWithFilter() {
|
||||
return ctPostCheckAndDeleteWithFilter.get();
|
||||
}
|
||||
|
||||
public boolean hadPreIncrement() {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.filter.FilterAllFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
@ -263,12 +265,26 @@ public class TestRegionObserverInterface {
|
|||
p = new Put(Bytes.toBytes(0));
|
||||
p.addColumn(A, A, A);
|
||||
verifyMethodResult(SimpleRegionObserver.class,
|
||||
new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
|
||||
tableName, new Boolean[] { false, false, false });
|
||||
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
|
||||
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
|
||||
"getPostCheckAndPutWithFilter" },
|
||||
tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
|
||||
|
||||
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
|
||||
verifyMethodResult(SimpleRegionObserver.class,
|
||||
new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
|
||||
tableName, new Boolean[] { true, true, true });
|
||||
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
|
||||
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
|
||||
"getPostCheckAndPutWithFilter" },
|
||||
tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
|
||||
|
||||
table.checkAndMutate(Bytes.toBytes(0),
|
||||
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
|
||||
.thenPut(p);
|
||||
verifyMethodResult(SimpleRegionObserver.class,
|
||||
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
|
||||
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
|
||||
"getPostCheckAndPutWithFilter" },
|
||||
tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
|
||||
} finally {
|
||||
util.deleteTable(tableName);
|
||||
}
|
||||
|
@ -285,14 +301,29 @@ public class TestRegionObserverInterface {
|
|||
Delete d = new Delete(Bytes.toBytes(0));
|
||||
table.delete(d);
|
||||
verifyMethodResult(
|
||||
SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
|
||||
"hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
|
||||
tableName, new Boolean[] { false, false, false });
|
||||
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
|
||||
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
|
||||
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
|
||||
"getPostCheckAndDeleteWithFilter" },
|
||||
tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
|
||||
|
||||
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
|
||||
verifyMethodResult(
|
||||
SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
|
||||
"hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
|
||||
tableName, new Boolean[] { true, true, true });
|
||||
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
|
||||
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
|
||||
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
|
||||
"getPostCheckAndDeleteWithFilter" },
|
||||
tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
|
||||
|
||||
table.checkAndMutate(Bytes.toBytes(0),
|
||||
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
|
||||
.thenDelete(d);
|
||||
verifyMethodResult(
|
||||
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
|
||||
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
|
||||
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
|
||||
"getPostCheckAndDeleteWithFilter" },
|
||||
tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
|
||||
} finally {
|
||||
util.deleteTable(tableName);
|
||||
table.close();
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
|
|||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
|
||||
/**
|
||||
|
@ -273,6 +274,11 @@ public class RegionAsTable implements Table {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mutateRow(RowMutations rm) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
|
@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
|
|||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.filter.SubstringComparator;
|
||||
import org.apache.hadoop.hbase.filter.ValueFilter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
|
@ -2149,6 +2150,128 @@ public class TestHRegion {
|
|||
assertEquals(0, r.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutate_WithFilters() throws Throwable {
|
||||
final byte[] FAMILY = Bytes.toBytes("fam");
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, FAMILY);
|
||||
|
||||
// Put one row
|
||||
Put put = new Put(row);
|
||||
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
|
||||
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
|
||||
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
|
||||
region.put(put);
|
||||
|
||||
// Put with success
|
||||
boolean ok = region.checkAndMutate(row,
|
||||
new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
),
|
||||
new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
// Put with failure
|
||||
ok = region.checkAndMutate(row,
|
||||
new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("c"))
|
||||
),
|
||||
new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
|
||||
assertFalse(ok);
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
|
||||
|
||||
// Delete with success
|
||||
ok = region.checkAndMutate(row,
|
||||
new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
),
|
||||
new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")));
|
||||
assertTrue(ok);
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
|
||||
|
||||
// Mutate with success
|
||||
ok = region.checkAndRowMutate(row,
|
||||
new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))
|
||||
),
|
||||
new RowMutations(row)
|
||||
.add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
|
||||
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
|
||||
assertTrue(ok);
|
||||
|
||||
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
|
||||
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
|
||||
final byte[] FAMILY = Bytes.toBytes("fam");
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, FAMILY);
|
||||
|
||||
// Put with specifying the timestamp
|
||||
region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
|
||||
|
||||
// Put with success
|
||||
boolean ok = region.checkAndMutate(row,
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
TimeRange.between(0, 101),
|
||||
new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
|
||||
assertTrue(ok);
|
||||
|
||||
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
|
||||
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
|
||||
|
||||
// Put with failure
|
||||
ok = region.checkAndMutate(row,
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
TimeRange.between(0, 100),
|
||||
new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
|
||||
assertFalse(ok);
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
|
||||
|
||||
// Mutate with success
|
||||
ok = region.checkAndRowMutate(row,
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
TimeRange.between(0, 101),
|
||||
new RowMutations(row)
|
||||
.add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
|
||||
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
|
||||
assertTrue(ok);
|
||||
|
||||
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
|
||||
}
|
||||
|
||||
// ////////////////////////////////////////////////////////////////////////////
|
||||
// Delete tests
|
||||
// ////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
|
||||
|
@ -428,6 +429,11 @@ public class ThriftTable implements Table {
|
|||
return new CheckAndMutateBuilderImpl(row, family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
|
||||
throw new NotImplementedException("Implement later");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mutateRow(RowMutations rm) throws IOException {
|
||||
TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
|
||||
|
|
Loading…
Reference in New Issue