diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index bfcc1870631..e10f1f82073 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -29,6 +29,7 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -289,6 +290,60 @@ public interface AsyncTable { CompletableFuture thenMutate(RowMutations mutation); } + /** + * Atomically checks if a row matches the specified filter. If it does, it adds the + * Put/Delete/RowMutations. + *

+ * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then + * execute it. This is a fluent style API, the code is like: + * + *

+   * 
+   * table.checkAndMutate(row, filter).thenPut(put)
+   *     .thenAccept(succ -> {
+   *       if (succ) {
+   *         System.out.println("Check and put succeeded");
+   *       } else {
+   *         System.out.println("Check and put failed");
+   *       }
+   *     });
+   * 
+   * 
+ */ + CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); + + /** + * A helper class for sending checkAndMutate request with a filter. + */ + interface CheckAndMutateWithFilterBuilder { + + /** + * @param timeRange time range to check. + */ + CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange); + + /** + * @param put data to put if check succeeds + * @return {@code true} if the new put was executed, {@code false} otherwise. The return value + * will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture thenPut(Put put); + + /** + * @param delete data to delete if check succeeds + * @return {@code true} if the new delete was executed, {@code false} otherwise. The return + * value will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture thenDelete(Delete delete); + + /** + * @param mutation mutations to perform if check succeeds + * @return true if the new mutation was executed, false otherwise. The return value will be + * wrapped by a {@link CompletableFuture}. + */ + CompletableFuture thenMutate(RowMutations mutation); + } + /** * Performs multiple mutations atomically on a single row. Currently {@link Put} and * {@link Delete} are supported. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 2256a4c1a89..d6406b6430d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -29,6 +29,7 @@ import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -173,6 +174,36 @@ class AsyncTableImpl implements AsyncTable { }; } + @Override + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + return new CheckAndMutateWithFilterBuilder() { + + private final CheckAndMutateWithFilterBuilder builder = + rawTable.checkAndMutate(row, filter); + + @Override + public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { + builder.timeRange(timeRange); + return this; + } + + @Override + public CompletableFuture thenPut(Put put) { + return wrap(builder.thenPut(put)); + } + + @Override + public CompletableFuture thenDelete(Delete delete) { + return wrap(builder.thenDelete(delete)); + } + + @Override + public CompletableFuture thenMutate(RowMutations mutation) { + return wrap(builder.thenMutate(mutation)); + } + }; + } + @Override public CompletableFuture mutateRow(RowMutations mutation) { return wrap(rawTable.mutateRow(mutation)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index c357b1f761d..ebdde4041c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; import org.apache.hadoop.hbase.client.ConnectionUtils.Converter; -import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; @@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRespo import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; /** * The implementation of RawAsyncTable. @@ -320,10 +319,10 @@ class RawAsyncTableImpl implements AsyncTable { validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, + null, timeRange, p), (c, r) -> r.getProcessed())) .call(); } @@ -332,10 +331,10 @@ class RawAsyncTableImpl implements AsyncTable { public CompletableFuture thenDelete(Delete delete) { preCheck(); return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, + null, timeRange, d), (c, r) -> r.getProcessed())) .call(); } @@ -343,12 +342,12 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture thenMutate(RowMutations mutation) { preCheck(); - return RawAsyncTableImpl.this - . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.this. mutateRow(controller, + return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), + rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, + null, timeRange, rm), resp -> resp.getExists())) .call(); } @@ -359,6 +358,68 @@ class RawAsyncTableImpl implements AsyncTable { return new CheckAndMutateBuilderImpl(row, family); } + + private final class CheckAndMutateWithFilterBuilderImpl + implements CheckAndMutateWithFilterBuilder { + + private final byte[] row; + + private final Filter filter; + + private TimeRange timeRange; + + public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { + this.row = Preconditions.checkNotNull(row, "row is null"); + this.filter = Preconditions.checkNotNull(filter, "filter is null"); + } + + @Override + public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + + @Override + public CompletableFuture thenPut(Put put) { + validatePut(put, conn.connConf.getMaxKeyValueSize()); + return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, + stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, + filter, timeRange, p), + (c, r) -> r.getProcessed())) + .call(); + } + + @Override + public CompletableFuture thenDelete(Delete delete) { + return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, + loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, + filter, timeRange, d), + (c, r) -> r.getProcessed())) + .call(); + } + + @Override + public CompletableFuture thenMutate(RowMutations mutation) { + return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), + rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, + loc, stub, mutation, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, + filter, timeRange, rm), + resp -> resp.getExists())) + .call(); + } + } + + @Override + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + return new CheckAndMutateWithFilterBuilderImpl(row, filter); + } + // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, // so here I write a new method as I do not want to change the abstraction of call method. private CompletableFuture mutateRow(HBaseRpcController controller, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 41b0e47c075..e600e7756ee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; @@ -355,6 +356,53 @@ public interface Table extends Closeable { * @return {@code true} if the new delete was executed, {@code false} otherwise. */ boolean thenDelete(Delete delete) throws IOException; + + /** + * @param mutation mutations to perform if check succeeds + * @return true if the new mutation was executed, false otherwise. + */ + boolean thenMutate(RowMutations mutation) throws IOException; + } + + /** + * Atomically checks if a row matches the specified filter. If it does, it adds the + * Put/Delete/RowMutations. + *

+ * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then + * execute it. This is a fluent style API, the code is like: + * + *

+   * 
+   * table.checkAndMutate(row, filter).thenPut(put);
+   * 
+   * 
+ */ + default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + throw new NotImplementedException("Add an implementation!"); + } + + /** + * A helper class for sending checkAndMutate request with a filter. + */ + interface CheckAndMutateWithFilterBuilder { + + /** + * @param timeRange timeRange to check + */ + CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange); + + /** + * @param put data to put if check succeeds + * @return {@code true} if the new put was executed, {@code false} otherwise. + */ + boolean thenPut(Put put) throws IOException; + + /** + * @param delete data to delete if check succeeds + * @return {@code true} if the new delete was executed, {@code false} otherwise. + */ + boolean thenDelete(Delete delete) throws IOException; + /** * @param mutation mutations to perform if check succeeds * @return true if the new mutation was executed, false otherwise. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 0a2a66eecae..283586ae5e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; @@ -220,58 +221,80 @@ class TableOverAsyncTable implements Table { FutureUtils.get(table.deleteAll(deletes)); } - private static final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { + @Override + public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { + return new CheckAndMutateBuilder() { - private final AsyncTable.CheckAndMutateBuilder builder; + private final AsyncTable.CheckAndMutateBuilder builder = table.checkAndMutate(row, family); - public CheckAndMutateBuilderImpl( - org.apache.hadoop.hbase.client.AsyncTable.CheckAndMutateBuilder builder) { - this.builder = builder; - } + @Override + public CheckAndMutateBuilder qualifier(byte[] qualifier) { + builder.qualifier(qualifier); + return this; + } - @Override - public CheckAndMutateBuilder qualifier(byte[] qualifier) { - builder.qualifier(qualifier); - return this; - } + @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + builder.timeRange(timeRange); + return this; + } - @Override - public CheckAndMutateBuilder timeRange(TimeRange timeRange) { - builder.timeRange(timeRange); - return this; - } + @Override + public CheckAndMutateBuilder ifNotExists() { + builder.ifNotExists(); + return this; + } - @Override - public CheckAndMutateBuilder ifNotExists() { - builder.ifNotExists(); - return this; - } + @Override + public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { + builder.ifMatches(compareOp, value); + return this; + } - @Override - public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { - builder.ifMatches(compareOp, value); - return this; - } + @Override + public boolean thenPut(Put put) throws IOException { + return FutureUtils.get(builder.thenPut(put)); + } - @Override - public boolean thenPut(Put put) throws IOException { - return FutureUtils.get(builder.thenPut(put)); - } + @Override + public boolean thenDelete(Delete delete) throws IOException { + return FutureUtils.get(builder.thenDelete(delete)); + } - @Override - public boolean thenDelete(Delete delete) throws IOException { - return FutureUtils.get(builder.thenDelete(delete)); - } - - @Override - public boolean thenMutate(RowMutations mutation) throws IOException { - return FutureUtils.get(builder.thenMutate(mutation)); - } + @Override + public boolean thenMutate(RowMutations mutation) throws IOException { + return FutureUtils.get(builder.thenMutate(mutation)); + } + }; } @Override - public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { - return new CheckAndMutateBuilderImpl(table.checkAndMutate(row, family)); + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + return new CheckAndMutateWithFilterBuilder() { + private final AsyncTable.CheckAndMutateWithFilterBuilder builder = + table.checkAndMutate(row, filter); + + @Override + public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { + builder.timeRange(timeRange); + return this; + } + + @Override + public boolean thenPut(Put put) throws IOException { + return FutureUtils.get(builder.thenPut(put)); + } + + @Override + public boolean thenDelete(Delete delete) throws IOException { + return FutureUtils.get(builder.thenDelete(delete)); + } + + @Override + public boolean thenMutate(RowMutations mutation) throws IOException { + return FutureUtils.get(builder.thenMutate(mutation)); + } + }; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 075829c2e39..4b80f1d38d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -30,6 +30,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -53,7 +54,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -187,71 +189,51 @@ public final class RequestConverter { /** * Create a protocol buffer MutateRequest for a conditioned put * - * @param regionName - * @param row - * @param family - * @param qualifier - * @param comparator - * @param compareType - * @param put * @return a mutate request * @throws IOException */ public static MutateRequest buildMutateRequest( - final byte[] regionName, final byte[] row, final byte[] family, - final byte [] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, TimeRange timeRange, final Put put) throws IOException { - return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange - , put, MutationType.PUT); + final byte[] regionName, final byte[] row, final byte[] family, + final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter, + final TimeRange timeRange, final Put put) throws IOException { + return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange, + put, MutationType.PUT); } /** * Create a protocol buffer MutateRequest for a conditioned delete * - * @param regionName - * @param row - * @param family - * @param qualifier - * @param comparator - * @param compareType - * @param delete * @return a mutate request * @throws IOException */ public static MutateRequest buildMutateRequest( - final byte[] regionName, final byte[] row, final byte[] family, - final byte [] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException { - return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange - , delete, MutationType.DELETE); + final byte[] regionName, final byte[] row, final byte[] family, + final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter, + final TimeRange timeRange, final Delete delete) throws IOException { + return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange, + delete, MutationType.DELETE); } public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row, - final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, TimeRange timeRange, final Mutation mutation, + final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, + final Filter filter, final TimeRange timeRange, final Mutation mutation, final MutationType type) throws IOException { return MutateRequest.newBuilder() .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)) .setMutation(ProtobufUtil.toMutation(type, mutation)) - .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange)) + .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange)) .build(); } + /** * Create a protocol buffer MutateRequest for conditioned row mutations * - * @param regionName - * @param row - * @param family - * @param qualifier - * @param comparator - * @param compareType - * @param rowMutations * @return a mutate request * @throws IOException */ public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName, final byte[] row, final byte[] family, final byte[] qualifier, - final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange, + final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange, final RowMutations rowMutations) throws IOException { RegionAction.Builder builder = getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); @@ -259,7 +241,7 @@ public final class RequestConverter { ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); for (Mutation mutation: rowMutations.getMutations()) { - MutationType mutateType = null; + MutationType mutateType; if (mutation instanceof Put) { mutateType = MutationType.PUT; } else if (mutation instanceof Delete) { @@ -275,7 +257,7 @@ public final class RequestConverter { builder.addAction(actionBuilder.build()); } return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) - .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange)) + .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange)) .build(); } @@ -918,25 +900,26 @@ public final class RequestConverter { /** * Create a protocol buffer Condition * - * @param row - * @param family - * @param qualifier - * @param comparator - * @param compareType * @return a Condition * @throws IOException */ public static Condition buildCondition(final byte[] row, final byte[] family, - final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType, - final TimeRange timeRange) { - return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row)) - .setFamily(UnsafeByteOperations.unsafeWrap(family)) - .setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ? - HConstants.EMPTY_BYTE_ARRAY : qualifier)) - .setComparator(ProtobufUtil.toComparator(comparator)) - .setCompareType(compareType) - .setTimeRange(ProtobufUtil.toTimeRange(timeRange)) - .build(); + final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, + final TimeRange timeRange) throws IOException { + + Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row)); + + if (filter != null) { + builder.setFilter(ProtobufUtil.toFilter(filter)); + } else { + builder.setFamily(UnsafeByteOperations.unsafeWrap(family)) + .setQualifier(UnsafeByteOperations.unsafeWrap( + qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier)) + .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value))) + .setCompareType(CompareType.valueOf(op.name())); + } + + return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build(); } /** diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index a22c6237bc7..810aaaa393a 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -139,11 +139,12 @@ message GetResponse { */ message Condition { required bytes row = 1; - required bytes family = 2; - required bytes qualifier = 3; - required CompareType compare_type = 4; - required Comparator comparator = 5; + optional bytes family = 2; + optional bytes qualifier = 3; + optional CompareType compare_type = 4; + optional Comparator comparator = 5; optional TimeRange time_range = 6; + optional Filter filter = 7; } diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 5fd20c81911..59cb0e25995 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -138,11 +138,12 @@ message GetResponse { */ message Condition { required bytes row = 1; - required bytes family = 2; - required bytes qualifier = 3; - required CompareType compare_type = 4; - required Comparator comparator = 5; + optional bytes family = 2; + optional bytes qualifier = 3; + optional CompareType compare_type = 4; + optional Comparator comparator = 5; optional TimeRange time_range = 6; + optional Filter filter = 7; } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 9e79c067516..c3fc819c6c0 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.rest.Constants; @@ -737,6 +739,11 @@ public class RemoteHTable implements Table { return new CheckAndMutateBuilderImpl(row, family); } + @Override + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + throw new NotImplementedException("Implement later"); + } + @Override public Result increment(Increment increment) throws IOException { throw new IOException("Increment not supported"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 8761d6b1d9d..05071fc5c4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -511,9 +512,8 @@ public interface RegionObserver { * @param op the comparison operation * @param comparator the comparator * @param put data to put if check succeeds - * @param result - * @return the return value to return to client if bypassing default - * processing + * @param result the default value of the result + * @return the return value to return to client if bypassing default processing */ default boolean preCheckAndPut(ObserverContext c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, @@ -521,6 +521,26 @@ public interface RegionObserver { return result; } + /** + * Called before checkAndPut. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param row row to check + * @param filter filter + * @param put data to put if check succeeds + * @param result the default value of the result + * @return the return value to return to client if bypassing default processing + */ + default boolean preCheckAndPut(ObserverContext c, byte[] row, + Filter filter, Put put, boolean result) throws IOException { + return result; + } + /** * Called before checkAndPut but after acquiring rowlock. *

@@ -540,9 +560,8 @@ public interface RegionObserver { * @param op the comparison operation * @param comparator the comparator * @param put data to put if check succeeds - * @param result - * @return the return value to return to client if bypassing default - * processing + * @param result the default value of the result + * @return the return value to return to client if bypassing default processing */ default boolean preCheckAndPutAfterRowLock(ObserverContext c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, @@ -550,6 +569,30 @@ public interface RegionObserver { return result; } + /** + * Called before checkAndPut but after acquiring rowlock. + *

+ * Note: Caution to be taken for not doing any long time operation in this hook. + * Row will be locked for longer time. Trying to acquire lock on another row, within this, + * can lead to potential deadlock. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param row row to check + * @param filter filter + * @param put data to put if check succeeds + * @param result the default value of the result + * @return the return value to return to client if bypassing default processing + */ + default boolean preCheckAndPutAfterRowLock(ObserverContext c, + byte[] row, Filter filter, Put put, boolean result) throws IOException { + return result; + } + /** * Called after checkAndPut *

@@ -571,6 +614,23 @@ public interface RegionObserver { return result; } + /** + * Called after checkAndPut + *

+ * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param row row to check + * @param filter filter + * @param put data to put if check succeeds + * @param result from the checkAndPut + * @return the possibly transformed return value to return to client + */ + default boolean postCheckAndPut(ObserverContext c, byte[] row, + Filter filter, Put put, boolean result) throws IOException { + return result; + } + /** * Called before checkAndDelete. *

@@ -586,7 +646,7 @@ public interface RegionObserver { * @param op the comparison operation * @param comparator the comparator * @param delete delete to commit if check succeeds - * @param result + * @param result the default value of the result * @return the value to return to client if bypassing default processing */ default boolean preCheckAndDelete(ObserverContext c, byte[] row, @@ -595,6 +655,26 @@ public interface RegionObserver { return result; } + /** + * Called before checkAndDelete. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param row row to check + * @param filter column family + * @param delete delete to commit if check succeeds + * @param result the default value of the result + * @return the value to return to client if bypassing default processing + */ + default boolean preCheckAndDelete(ObserverContext c, byte[] row, + Filter filter, Delete delete, boolean result) throws IOException { + return result; + } + /** * Called before checkAndDelete but after acquiring rowock. *

@@ -614,7 +694,7 @@ public interface RegionObserver { * @param op the comparison operation * @param comparator the comparator * @param delete delete to commit if check succeeds - * @param result + * @param result the default value of the result * @return the value to return to client if bypassing default processing */ default boolean preCheckAndDeleteAfterRowLock(ObserverContext c, @@ -623,6 +703,30 @@ public interface RegionObserver { return result; } + /** + * Called before checkAndDelete but after acquiring rowock. + *

+ * Note: Caution to be taken for not doing any long time operation in this hook. + * Row will be locked for longer time. Trying to acquire lock on another row, within this, + * can lead to potential deadlock. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param row row to check + * @param filter filter + * @param delete delete to commit if check succeeds + * @param result the default value of the result + * @return the value to return to client if bypassing default processing + */ + default boolean preCheckAndDeleteAfterRowLock(ObserverContext c, + byte[] row, Filter filter, Delete delete, boolean result) throws IOException { + return result; + } + /** * Called after checkAndDelete *

@@ -644,6 +748,23 @@ public interface RegionObserver { return result; } + /** + * Called after checkAndDelete + *

+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param row row to check + * @param filter filter + * @param delete delete to commit if check succeeds + * @param result from the CheckAndDelete + * @return the possibly transformed returned value to return to client + */ + default boolean postCheckAndDelete(ObserverContext c, byte[] row, + Filter filter, Delete delete, boolean result) throws IOException { + return result; + } + /** * Called before Append. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e1c55ec8b59..c0ad87cef7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterWrapper; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HFileLink; @@ -162,8 +163,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.CompressionTest; -import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; @@ -4205,13 +4204,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException { checkMutationType(mutation, row); - return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation); + return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null, + mutation); + } + + @Override + public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation) + throws IOException { + return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation); } @Override public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException { - return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null); + return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null); + } + + @Override + public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm) + throws IOException { + return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null); } /** @@ -4219,7 +4231,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * switches in the few places where there is deviation. */ private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, + CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange, RowMutations rowMutations, Mutation mutation) throws IOException { // Could do the below checks but seems wacky with two callers only. Just comment out for now. @@ -4233,8 +4245,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(); try { Get get = new Get(row); - checkFamily(family); - get.addColumn(family, qualifier); + if (family != null) { + checkFamily(family); + get.addColumn(family, qualifier); + } + if (filter != null) { + get.setFilter(filter); + } if (timeRange != null) { get.setTimeRange(timeRange.getMin(), timeRange.getMax()); } @@ -4246,11 +4263,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Call coprocessor. Boolean processed = null; if (mutation instanceof Put) { - processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, - qualifier, op, comparator, (Put)mutation); + if (filter != null) { + processed = this.getCoprocessorHost() + .preCheckAndPutAfterRowLock(row, filter, (Put) mutation); + } else { + processed = this.getCoprocessorHost() + .preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator, + (Put) mutation); + } } else if (mutation instanceof Delete) { - processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family, - qualifier, op, comparator, (Delete)mutation); + if (filter != null) { + processed = this.getCoprocessorHost() + .preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation); + } else { + processed = this.getCoprocessorHost() + .preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator, + (Delete) mutation); + } } if (processed != null) { return processed; @@ -4260,20 +4289,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Supposition is that now all changes are done under row locks, then when we go to read, // we'll get the latest on this row. List result = get(get, false); - boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; boolean matches = false; long cellTs = 0; - if (result.isEmpty() && valueIsNull) { - matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { - matches = true; - cellTs = result.get(0).getTimestamp(); - } else if (result.size() == 1 && !valueIsNull) { - Cell kv = result.get(0); - cellTs = kv.getTimestamp(); - int compareResult = PrivateCellUtil.compareValue(kv, comparator); - matches = matches(op, compareResult); + if (filter != null) { + if (!result.isEmpty()) { + matches = true; + cellTs = result.get(0).getTimestamp(); + } + } else { + boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; + if (result.isEmpty() && valueIsNull) { + matches = true; + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { + matches = true; + cellTs = result.get(0).getTimestamp(); + } else if (result.size() == 1 && !valueIsNull) { + Cell kv = result.get(0); + cellTs = kv.getTimestamp(); + int compareResult = PrivateCellUtil.compareValue(kv, comparator); + matches = matches(op, compareResult); + } } + // If matches put the new put or delete the new delete if (matches) { // We have acquired the row lock already. If the system clock is NOT monotonically diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 21e8313869b..b2404ca43a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; @@ -609,9 +610,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. */ private boolean checkAndRowMutate(final HRegion region, final List actions, - final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, - ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder, - ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { + final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, + CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange, + RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement) + throws IOException { int countOfCompleteMutation = 0; try { if (!region.getRegionInfo().isMetaRegion()) { @@ -654,7 +656,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addResultOrException( resultOrExceptionOrBuilder.build()); } - return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); + + if (filter != null) { + return region.checkAndRowMutate(row, filter, timeRange, rm); + } else { + return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); + } } finally { // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner // even if the malformed cells are not skipped. @@ -2775,18 +2782,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.getFamily().toByteArray(); - byte[] qualifier = condition.getQualifier().toByteArray(); - CompareOperator op = - CompareOperator.valueOf(condition.getCompareType().name()); - ByteArrayComparable comparator = - ProtobufUtil.toComparator(condition.getComparator()); + byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; + byte[] qualifier = condition.hasQualifier() ? + condition.getQualifier().toByteArray() : null; + CompareOperator op = condition.hasCompareType() ? + CompareOperator.valueOf(condition.getCompareType().name()) : null; + ByteArrayComparable comparator = condition.hasComparator() ? + ProtobufUtil.toComparator(condition.getComparator()) : null; + Filter filter = condition.hasFilter() ? + ProtobufUtil.toFilter(condition.getFilter()) : null; TimeRange timeRange = condition.hasTimeRange() ? ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); processed = checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, - qualifier, op, comparator, timeRange, regionActionResultBuilder, + qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, spaceQuotaEnforcement); } else { doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), @@ -2935,24 +2945,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.getFamily().toByteArray(); - byte[] qualifier = condition.getQualifier().toByteArray(); - CompareOperator compareOp = - CompareOperator.valueOf(condition.getCompareType().name()); - ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); + byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; + byte[] qualifier = condition.hasQualifier() ? + condition.getQualifier().toByteArray() : null; + CompareOperator op = condition.hasCompareType() ? + CompareOperator.valueOf(condition.getCompareType().name()) : null; + ByteArrayComparable comparator = condition.hasComparator() ? + ProtobufUtil.toComparator(condition.getComparator()) : null; + Filter filter = condition.hasFilter() ? + ProtobufUtil.toFilter(condition.getFilter()) : null; TimeRange timeRange = condition.hasTimeRange() ? ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); if (region.getCoprocessorHost() != null) { - processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, - compareOp, comparator, put); + if (filter != null) { + processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put); + } else { + processed = region.getCoprocessorHost() + .preCheckAndPut(row, family, qualifier, op, comparator, put); + } } if (processed == null) { - boolean result = region.checkAndMutate(row, family, - qualifier, compareOp, comparator, timeRange, put); + boolean result; + if (filter != null) { + result = region.checkAndMutate(row, filter, timeRange, put); + } else { + result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange, + put); + } if (region.getCoprocessorHost() != null) { - result = region.getCoprocessorHost().postCheckAndPut(row, family, - qualifier, compareOp, comparator, put, result); + if (filter != null) { + result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result); + } else { + result = region.getCoprocessorHost() + .postCheckAndPut(row, family, qualifier, op, comparator, put, result); + } } processed = result; } @@ -2969,23 +2996,42 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.getFamily().toByteArray(); - byte[] qualifier = condition.getQualifier().toByteArray(); - CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); - ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); + byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; + byte[] qualifier = condition.hasQualifier() ? + condition.getQualifier().toByteArray() : null; + CompareOperator op = condition.hasCompareType() ? + CompareOperator.valueOf(condition.getCompareType().name()) : null; + ByteArrayComparable comparator = condition.hasComparator() ? + ProtobufUtil.toComparator(condition.getComparator()) : null; + Filter filter = condition.hasFilter() ? + ProtobufUtil.toFilter(condition.getFilter()) : null; TimeRange timeRange = condition.hasTimeRange() ? ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); if (region.getCoprocessorHost() != null) { - processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, - comparator, delete); + if (filter != null) { + processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete); + } else { + processed = region.getCoprocessorHost() + .preCheckAndDelete(row, family, qualifier, op, comparator, delete); + } } if (processed == null) { - boolean result = region.checkAndMutate(row, family, - qualifier, op, comparator, timeRange, delete); + boolean result; + if (filter != null) { + result = region.checkAndMutate(row, filter, timeRange, delete); + } else { + result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange, + delete); + } if (region.getCoprocessorHost() != null) { - result = region.getCoprocessorHost().postCheckAndDelete(row, family, - qualifier, op, comparator, delete, result); + if (filter != null) { + result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete, + result); + } else { + result = region.getCoprocessorHost() + .postCheckAndDelete(row, family, qualifier, op, comparator, delete, result); + } } processed = result; } @@ -3036,7 +3082,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, break; default: break; - } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index e684c26181d..8478a73ddbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.yetus.audience.InterfaceAudience; @@ -333,6 +334,31 @@ public interface Region extends ConfigurationObserver { boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException; + /** + * Atomically checks if a row matches the filter and if it does, it performs the mutation. See + * checkAndRowMutate to do many checkAndPuts at a time on a single row. + * @param row to check + * @param filter the filter + * @param mutation data to put if check succeeds + * @return true if mutation was applied, false otherwise + */ + default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation) + throws IOException { + return checkAndMutate(row, filter, TimeRange.allTime(), mutation); + } + + /** + * Atomically checks if a row value matches the filter and if it does, it performs the mutation. + * See checkAndRowMutate to do many checkAndPuts at a time on a single row. + * @param row to check + * @param filter the filter + * @param mutation data to put if check succeeds + * @param timeRange time range to check + * @return true if mutation was applied, false otherwise + */ + boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation) + throws IOException; + /** * Atomically checks if a row/family/qualifier value matches the expected values and if it does, * it performs the row mutations. If the passed value is null, the lack of column value @@ -370,6 +396,33 @@ public interface Region extends ConfigurationObserver { ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations) throws IOException; + /** + * Atomically checks if a row matches the filter and if it does, it performs the row mutations. + * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a + * time. + * @param row to check + * @param filter the filter + * @param mutations data to put if check succeeds + * @return true if mutations were applied, false otherwise + */ + default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations) + throws IOException { + return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations); + } + + /** + * Atomically checks if a row matches the filter and if it does, it performs the row mutations. + * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a + * time. + * @param row to check + * @param filter the filter + * @param mutations data to put if check succeeds + * @param timeRange time range to check + * @return true if mutations were applied, false otherwise + */ + boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange, + RowMutations mutations) throws IOException; + /** * Deletes the specified cells/row. * @param delete diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 4468f571dc5..67f013368d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -1079,6 +1080,31 @@ public class RegionCoprocessorHost }); } + /** + * Supports Coprocessor 'bypass'. + * @param row row to check + * @param filter filter + * @param put data to put if check succeeds + * @return true or false to return to client if default processing should be bypassed, or null + * otherwise + */ + public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put) + throws IOException { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) { + return null; + } + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preCheckAndPut(this, row, filter, put, getResult()); + } + }); + } + /** * Supports Coprocessor 'bypass'. * @param row row to check @@ -1088,7 +1114,7 @@ public class RegionCoprocessorHost * @param comparator the comparator * @param put data to put if check succeeds * @return true or false to return to client if default processing should be bypassed, or null - * otherwise + * otherwise */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", justification="Null is legit") @@ -1111,6 +1137,33 @@ public class RegionCoprocessorHost }); } + /** + * Supports Coprocessor 'bypass'. + * @param row row to check + * @param filter filter + * @param put data to put if check succeeds + * @return true or false to return to client if default processing should be bypassed, or null + * otherwise + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", + justification="Null is legit") + public Boolean preCheckAndPutAfterRowLock( + final byte[] row, final Filter filter, final Put put) throws IOException { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) { + return null; + } + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult()); + } + }); + } + /** * @param row row to check * @param family column family @@ -1137,6 +1190,26 @@ public class RegionCoprocessorHost }); } + /** + * @param row row to check + * @param filter filter + * @param put data to put if check succeeds + * @throws IOException e + */ + public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put, + boolean result) throws IOException { + if (this.coprocEnvironments.isEmpty()) { + return result; + } + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.postCheckAndPut(this, row, filter, put, getResult()); + } + }); + } + /** * Supports Coprocessor 'bypass'. * @param row row to check @@ -1145,8 +1218,8 @@ public class RegionCoprocessorHost * @param op the comparison operation * @param comparator the comparator * @param delete delete to commit if check succeeds - * @return true or false to return to client if default processing should be bypassed, - * or null otherwise + * @return true or false to return to client if default processing should be bypassed, or null + * otherwise */ public Boolean preCheckAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, @@ -1168,6 +1241,31 @@ public class RegionCoprocessorHost }); } + /** + * Supports Coprocessor 'bypass'. + * @param row row to check + * @param filter filter + * @param delete delete to commit if check succeeds + * @return true or false to return to client if default processing should be bypassed, or null + * otherwise + */ + public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete) + throws IOException { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) { + return null; + } + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preCheckAndDelete(this, row, filter, delete, getResult()); + } + }); + } + /** * Supports Coprocessor 'bypass'. * @param row row to check @@ -1200,6 +1298,33 @@ public class RegionCoprocessorHost }); } + /** + * Supports Coprocessor 'bypass'. + * @param row row to check + * @param filter filter + * @param delete delete to commit if check succeeds + * @return true or false to return to client if default processing should be bypassed, + * or null otherwise + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", + justification="Null is legit") + public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter, + final Delete delete) throws IOException { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) { + return null; + } + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult()); + } + }); + } + /** * @param row row to check * @param family column family @@ -1226,6 +1351,26 @@ public class RegionCoprocessorHost }); } + /** + * @param row row to check + * @param filter filter + * @param delete delete to commit if check succeeds + * @throws IOException e + */ + public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete, + boolean result) throws IOException { + if (this.coprocEnvironments.isEmpty()) { + return result; + } + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { + @Override + public Boolean call(RegionObserver observer) throws IOException { + return observer.postCheckAndDelete(this, row, filter, delete, getResult()); + } + }); + } + /** * Supports Coprocessor 'bypass'. * @param append append object diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java index 2e9bb74eca1..755d2a31928 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.Filter; /** * Can be overridden in UT if you only want to implement part of the methods in {@link AsyncTable}. @@ -105,6 +106,11 @@ public class DummyAsyncTable implements AsyncT return null; } + @Override + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + return null; + } + @Override public CompletableFuture mutateRow(RowMutations mutation) { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 63080b9a34d..b9fb8119575 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -30,6 +30,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -41,10 +42,17 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.TimestampsFilter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -401,6 +409,201 @@ public class TestAsyncTable { assertTrue(ok); } + @Test + public void testCheckAndMutateWithSingleFilter() throws Throwable { + AsyncTable table = getTable.get(); + + // Put one row + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); + table.put(put).get(); + + // Put with success + boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .get(); + assertTrue(ok); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("b"))) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) + .get(); + assertFalse(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); + + // Delete with success + ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))) + .get(); + assertTrue(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); + + // Mutate with success + ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), + CompareOperator.EQUAL, Bytes.toBytes("b"))) + .thenMutate(new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) + .get(); + assertTrue(ok); + + result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); + } + + @Test + public void testCheckAndMutateWithMultipleFilters() throws Throwable { + AsyncTable table = getTable.get(); + + // Put one row + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); + table.put(put).get(); + + // Put with success + boolean ok = table.checkAndMutate(row, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + )) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .get(); + assertTrue(ok); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = table.checkAndMutate(row, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("c")) + )) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) + .get(); + assertFalse(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); + + // Delete with success + ok = table.checkAndMutate(row, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + )) + .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))) + .get(); + assertTrue(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); + + // Mutate with success + ok = table.checkAndMutate(row, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + )) + .thenMutate(new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) + .get(); + assertTrue(ok); + + result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); + } + + @Test + public void testCheckAndMutateWithTimestampFilter() throws Throwable { + AsyncTable table = getTable.get(); + + // Put with specifying the timestamp + table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); + + // Put with success + boolean ok = table.checkAndMutate(row, new FilterList( + new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), + new TimestampsFilter(Collections.singletonList(100L)) + )) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))) + .get(); + assertTrue(ok); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = table.checkAndMutate(row, new FilterList( + new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), + new TimestampsFilter(Collections.singletonList(101L)) + )) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) + .get(); + assertFalse(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); + } + + @Test + public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { + AsyncTable table = getTable.get(); + + // Put with specifying the timestamp + table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))) + .get(); + + // Put with success + boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 101)) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))) + .get(); + assertTrue(ok); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 100)) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) + .get(); + assertFalse(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); + } + + @Test(expected = NullPointerException.class) + public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable { + getTable.get().checkAndMutate(row, FAMILY) + .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); + } + @Test public void testDisabled() throws InterruptedException, ExecutionException { ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java index 53353d2d427..f399e8619f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java @@ -18,14 +18,25 @@ package org.apache.hadoop.hbase.client; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Collections; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.TimestampsFilter; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -182,4 +193,183 @@ public class TestCheckAndMutate { } } + @Test + public void testCheckAndMutateWithSingleFilter() throws Throwable { + try (Table table = createTable()) { + // put one row + putOneRow(table); + // get row back and assert the values + getOneRowAndAssertAllExist(table); + + // Put with success + boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, + Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a"))) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); + assertTrue(ok); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("b"))) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))); + assertFalse(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E")))); + + // Delete with success + ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))); + assertTrue(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")))); + + // Mutate with success + ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), + CompareOperator.EQUAL, Bytes.toBytes("b"))) + .thenMutate(new RowMutations(ROWKEY) + .add((Mutation) new Put(ROWKEY) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))); + assertTrue(ok); + + result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")))); + } + } + + @Test + public void testCheckAndMutateWithMultipleFilters() throws Throwable { + try (Table table = createTable()) { + // put one row + putOneRow(table); + // get row back and assert the values + getOneRowAndAssertAllExist(table); + + // Put with success + boolean ok = table.checkAndMutate(ROWKEY, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + )) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); + assertTrue(ok); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = table.checkAndMutate(ROWKEY, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("c")) + )) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))); + assertFalse(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E")))); + + // Delete with success + ok = table.checkAndMutate(ROWKEY, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + )) + .thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))); + assertTrue(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")))); + + // Mutate with success + ok = table.checkAndMutate(ROWKEY, new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + )) + .thenMutate(new RowMutations(ROWKEY) + .add((Mutation) new Put(ROWKEY) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))); + assertTrue(ok); + + result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")))); + } + } + + @Test + public void testCheckAndMutateWithTimestampFilter() throws Throwable { + try (Table table = createTable()) { + // Put with specifying the timestamp + table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); + + // Put with success + boolean ok = table.checkAndMutate(ROWKEY, new FilterList( + new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), + new TimestampsFilter(Collections.singletonList(100L)) + )) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); + assertTrue(ok); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = table.checkAndMutate(ROWKEY, new FilterList( + new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), + new TimestampsFilter(Collections.singletonList(101L)) + )) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); + assertFalse(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")))); + } + } + + @Test + public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { + try (Table table = createTable()) { + // Put with specifying the timestamp + table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); + + // Put with success + boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, + Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 101)) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); + assertTrue(ok); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 100)) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); + assertFalse(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")))); + } + } + + @Test(expected = NullPointerException.class) + public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable { + try (Table table = createTable()) { + table.checkAndMutate(ROWKEY, FAMILY) + .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java index ab7d0704048..655225a776f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -30,12 +30,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -234,8 +234,7 @@ public class TestMalformedCellFromClient { ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); ClientProtos.Condition condition = RequestConverter - .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]), - HBaseProtos.CompareType.EQUAL, null); + .buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null); for (Mutation mutation : rm.getMutations()) { ClientProtos.MutationProto.MutationType mutateType = null; if (mutation instanceof Put) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index caf0abb0371..523466daa6d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -99,11 +100,17 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { final AtomicInteger ctPostIncrement = new AtomicInteger(0); final AtomicInteger ctPostAppend = new AtomicInteger(0); final AtomicInteger ctPreCheckAndPut = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndPutWithFilter = new AtomicInteger(0); final AtomicInteger ctPreCheckAndPutAfterRowLock = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndPutWithFilterAfterRowLock = new AtomicInteger(0); final AtomicInteger ctPostCheckAndPut = new AtomicInteger(0); + final AtomicInteger ctPostCheckAndPutWithFilter = new AtomicInteger(0); final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndDeleteWithFilter = new AtomicInteger(0); final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0); final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0); + final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0); final AtomicInteger ctPreScannerNext = new AtomicInteger(0); final AtomicInteger ctPostScannerNext = new AtomicInteger(0); final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0); @@ -492,6 +499,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return true; } + @Override + public boolean preCheckAndPut(ObserverContext c, byte[] row, + Filter filter, Put put, boolean result) throws IOException { + ctPreCheckAndPutWithFilter.incrementAndGet(); + return true; + } + @Override public boolean preCheckAndPutAfterRowLock(ObserverContext e, byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp, @@ -500,6 +514,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return true; } + @Override + public boolean preCheckAndPutAfterRowLock(ObserverContext c, + byte[] row, Filter filter, Put put, boolean result) throws IOException { + ctPreCheckAndPutWithFilterAfterRowLock.incrementAndGet(); + return true; + } + @Override public boolean postCheckAndPut(ObserverContext e, byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator, @@ -508,6 +529,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return true; } + @Override + public boolean postCheckAndPut(ObserverContext c, byte[] row, + Filter filter, Put put, boolean result) throws IOException { + ctPostCheckAndPutWithFilter.incrementAndGet(); + return true; + } + @Override public boolean preCheckAndDelete(ObserverContext e, byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator, @@ -516,6 +544,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return true; } + @Override + public boolean preCheckAndDelete(ObserverContext c, byte[] row, + Filter filter, Delete delete, boolean result) throws IOException { + ctPreCheckAndDeleteWithFilter.incrementAndGet(); + return true; + } + @Override public boolean preCheckAndDeleteAfterRowLock(ObserverContext e, byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp, @@ -524,6 +559,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return true; } + @Override + public boolean preCheckAndDeleteAfterRowLock(ObserverContext c, + byte[] row, Filter filter, Delete delete, boolean result) throws IOException { + ctPreCheckAndDeleteWithFilterAfterRowLock.incrementAndGet(); + return true; + } + @Override public boolean postCheckAndDelete(ObserverContext e, byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator, @@ -532,6 +574,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return true; } + @Override + public boolean postCheckAndDelete(ObserverContext e, byte[] row, + Filter filter, Delete delete, boolean result) throws IOException { + ctPostCheckAndDeleteWithFilter.incrementAndGet(); + return true; + } + @Override public Result preAppendAfterRowLock(ObserverContext e, Append append) throws IOException { @@ -693,28 +742,52 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return ctPostCloseRegionOperation.get(); } - public boolean hadPreCheckAndPut() { - return ctPreCheckAndPut.get() > 0; + public int getPreCheckAndPut() { + return ctPreCheckAndPut.get(); } - public boolean hadPreCheckAndPutAfterRowLock() { - return ctPreCheckAndPutAfterRowLock.get() > 0; + public int getPreCheckAndPutWithFilter() { + return ctPreCheckAndPutWithFilter.get(); } - public boolean hadPostCheckAndPut() { - return ctPostCheckAndPut.get() > 0; + public int getPreCheckAndPutAfterRowLock() { + return ctPreCheckAndPutAfterRowLock.get(); } - public boolean hadPreCheckAndDelete() { - return ctPreCheckAndDelete.get() > 0; + public int getPreCheckAndPutWithFilterAfterRowLock() { + return ctPreCheckAndPutWithFilterAfterRowLock.get(); } - public boolean hadPreCheckAndDeleteAfterRowLock() { - return ctPreCheckAndDeleteAfterRowLock.get() > 0; + public int getPostCheckAndPut() { + return ctPostCheckAndPut.get(); } - public boolean hadPostCheckAndDelete() { - return ctPostCheckAndDelete.get() > 0; + public int getPostCheckAndPutWithFilter() { + return ctPostCheckAndPutWithFilter.get(); + } + + public int getPreCheckAndDelete() { + return ctPreCheckAndDelete.get(); + } + + public int getPreCheckAndDeleteWithFilter() { + return ctPreCheckAndDeleteWithFilter.get(); + } + + public int getPreCheckAndDeleteAfterRowLock() { + return ctPreCheckAndDeleteAfterRowLock.get(); + } + + public int getPreCheckAndDeleteWithFilterAfterRowLock() { + return ctPreCheckAndDeleteWithFilterAfterRowLock.get(); + } + + public int getPostCheckAndDelete() { + return ctPostCheckAndDelete.get(); + } + + public int getPostCheckAndDeleteWithFilter() { + return ctPostCheckAndDeleteWithFilter.get(); } public boolean hadPreIncrement() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 9f7664928d9..e9a354a2688 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.filter.FilterAllFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -263,12 +265,26 @@ public class TestRegionObserverInterface { p = new Put(Bytes.toBytes(0)); p.addColumn(A, A, A); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" }, - tableName, new Boolean[] { false, false, false }); + new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", + "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", + "getPostCheckAndPutWithFilter" }, + tableName, new Integer[] { 0, 0, 0, 0, 0, 0 }); + table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" }, - tableName, new Boolean[] { true, true, true }); + new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", + "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", + "getPostCheckAndPutWithFilter" }, + tableName, new Integer[] { 1, 1, 1, 0, 0, 0 }); + + table.checkAndMutate(Bytes.toBytes(0), + new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)) + .thenPut(p); + verifyMethodResult(SimpleRegionObserver.class, + new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", + "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", + "getPostCheckAndPutWithFilter" }, + tableName, new Integer[] { 1, 1, 1, 1, 1, 1 }); } finally { util.deleteTable(tableName); } @@ -285,14 +301,29 @@ public class TestRegionObserverInterface { Delete d = new Delete(Bytes.toBytes(0)); table.delete(d); verifyMethodResult( - SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete", - "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" }, - tableName, new Boolean[] { false, false, false }); + SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", + "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", + "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", + "getPostCheckAndDeleteWithFilter" }, + tableName, new Integer[] { 0, 0, 0, 0, 0, 0 }); + table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d); verifyMethodResult( - SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete", - "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" }, - tableName, new Boolean[] { true, true, true }); + SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", + "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", + "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", + "getPostCheckAndDeleteWithFilter" }, + tableName, new Integer[] { 1, 1, 1, 0, 0, 0 }); + + table.checkAndMutate(Bytes.toBytes(0), + new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)) + .thenDelete(d); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", + "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", + "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", + "getPostCheckAndDeleteWithFilter" }, + tableName, new Integer[] { 1, 1, 1, 1, 1, 1 }); } finally { util.deleteTable(tableName); table.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java index 07b834bef32..27a8406adb9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; /** @@ -218,6 +219,11 @@ public class RegionAsTable implements Table { throw new UnsupportedOperationException(); } + @Override + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + throw new UnsupportedOperationException(); + } + @Override public void mutateRow(RowMutations rm) throws IOException { throw new UnsupportedOperationException(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 53ec7d16d4e..6b4dfcfc5a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.filter.ValueFilter; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -2147,6 +2148,128 @@ public class TestHRegion { assertEquals(0, r.size()); } + @Test + public void testCheckAndMutate_WithFilters() throws Throwable { + final byte[] FAMILY = Bytes.toBytes("fam"); + + // Setting up region + this.region = initHRegion(tableName, method, CONF, FAMILY); + + // Put one row + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); + region.put(put); + + // Put with success + boolean ok = region.checkAndMutate(row, + new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + ), + new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); + assertTrue(ok); + + Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = region.checkAndMutate(row, + new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("c")) + ), + new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))); + assertFalse(ok); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty()); + + // Delete with success + ok = region.checkAndMutate(row, + new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + ), + new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))); + assertTrue(ok); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty()); + + // Mutate with success + ok = region.checkAndRowMutate(row, + new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")) + ), + new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); + assertTrue(ok); + + result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty()); + } + + @Test + public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable { + final byte[] FAMILY = Bytes.toBytes("fam"); + + // Setting up region + this.region = initHRegion(tableName, method, CONF, FAMILY); + + // Put with specifying the timestamp + region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); + + // Put with success + boolean ok = region.checkAndMutate(row, + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + TimeRange.between(0, 101), + new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); + assertTrue(ok); + + Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = region.checkAndMutate(row, + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + TimeRange.between(0, 100), + new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); + assertFalse(ok); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty()); + + // Mutate with success + ok = region.checkAndRowMutate(row, + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + TimeRange.between(0, 101), + new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); + assertTrue(ok); + + result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty()); + } + // //////////////////////////////////////////////////////////////////////////// // Delete tests // //////////////////////////////////////////////////////////////////////////// diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java index 2bae6851f08..30b1fa1dbd9 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.thrift2.ThriftUtilities; @@ -425,6 +426,11 @@ public class ThriftTable implements Table { return new CheckAndMutateBuilderImpl(row, family); } + @Override + public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { + throw new NotImplementedException("Implement later"); + } + @Override public void mutateRow(RowMutations rm) throws IOException { TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);