HBASE-23146 Support CheckAndMutate with multiple conditions (#1114)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Toshihiro Suzuki 2020-02-26 08:09:04 +09:00 committed by GitHub
parent 9f223c2236
commit ecbed33092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1454 additions and 206 deletions

View File

@ -29,6 +29,7 @@ import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@ -289,6 +290,60 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
}
/**
* Atomically checks if a row matches the specified filter. If it does, it adds the
* Put/Delete/RowMutations.
* <p>
* Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
* execute it. This is a fluent style API, the code is like:
*
* <pre>
* <code>
* table.checkAndMutate(row, filter).thenPut(put)
* .thenAccept(succ -> {
* if (succ) {
* System.out.println("Check and put succeeded");
* } else {
* System.out.println("Check and put failed");
* }
* });
* </code>
* </pre>
*/
CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
/**
* A helper class for sending checkAndMutate request with a filter.
*/
interface CheckAndMutateWithFilterBuilder {
/**
* @param timeRange time range to check.
*/
CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
/**
* @param put data to put if check succeeds
* @return {@code true} if the new put was executed, {@code false} otherwise. The return value
* will be wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> thenPut(Put put);
/**
* @param delete data to delete if check succeeds
* @return {@code true} if the new delete was executed, {@code false} otherwise. The return
* value will be wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> thenDelete(Delete delete);
/**
* @param mutation mutations to perform if check succeeds
* @return true if the new mutation was executed, false otherwise. The return value will be
* wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
}
/**
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.

View File

@ -29,6 +29,7 @@ import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -173,6 +174,36 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
};
}
@Override
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return new CheckAndMutateWithFilterBuilder() {
private final CheckAndMutateWithFilterBuilder builder =
rawTable.checkAndMutate(row, filter);
@Override
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
builder.timeRange(timeRange);
return this;
}
@Override
public CompletableFuture<Boolean> thenPut(Put put) {
return wrap(builder.thenPut(put));
}
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
return wrap(builder.thenDelete(delete));
}
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
return wrap(builder.thenMutate(mutation));
}
};
}
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRespo
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
/**
* The implementation of RawAsyncTable.
@ -320,10 +319,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, p),
(c, r) -> r.getProcessed()))
.call();
}
@ -332,10 +331,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, d),
(c, r) -> r.getProcessed()))
.call();
}
@ -343,12 +342,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
return RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller,
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm),
resp -> resp.getExists()))
.call();
}
@ -359,6 +358,68 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return new CheckAndMutateBuilderImpl(row, family);
}
private final class CheckAndMutateWithFilterBuilderImpl
implements CheckAndMutateWithFilterBuilder {
private final byte[] row;
private final Filter filter;
private TimeRange timeRange;
public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
this.row = Preconditions.checkNotNull(row, "row is null");
this.filter = Preconditions.checkNotNull(filter, "filter is null");
}
@Override
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
this.timeRange = timeRange;
return this;
}
@Override
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, p),
(c, r) -> r.getProcessed()))
.call();
}
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, d),
(c, r) -> r.getProcessed()))
.call();
}
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, rm),
resp -> resp.getExists()))
.call();
}
}
@Override
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return new CheckAndMutateWithFilterBuilderImpl(row, filter);
}
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method.
private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
@ -355,6 +356,53 @@ public interface Table extends Closeable {
* @return {@code true} if the new delete was executed, {@code false} otherwise.
*/
boolean thenDelete(Delete delete) throws IOException;
/**
* @param mutation mutations to perform if check succeeds
* @return true if the new mutation was executed, false otherwise.
*/
boolean thenMutate(RowMutations mutation) throws IOException;
}
/**
* Atomically checks if a row matches the specified filter. If it does, it adds the
* Put/Delete/RowMutations.
* <p>
* Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
* execute it. This is a fluent style API, the code is like:
*
* <pre>
* <code>
* table.checkAndMutate(row, filter).thenPut(put);
* </code>
* </pre>
*/
default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
throw new NotImplementedException("Add an implementation!");
}
/**
* A helper class for sending checkAndMutate request with a filter.
*/
interface CheckAndMutateWithFilterBuilder {
/**
* @param timeRange timeRange to check
*/
CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
/**
* @param put data to put if check succeeds
* @return {@code true} if the new put was executed, {@code false} otherwise.
*/
boolean thenPut(Put put) throws IOException;
/**
* @param delete data to delete if check succeeds
* @return {@code true} if the new delete was executed, {@code false} otherwise.
*/
boolean thenDelete(Delete delete) throws IOException;
/**
* @param mutation mutations to perform if check succeeds
* @return true if the new mutation was executed, false otherwise.

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
@ -220,58 +221,80 @@ class TableOverAsyncTable implements Table {
FutureUtils.get(table.deleteAll(deletes));
}
private static final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new CheckAndMutateBuilder() {
private final AsyncTable.CheckAndMutateBuilder builder;
private final AsyncTable.CheckAndMutateBuilder builder = table.checkAndMutate(row, family);
public CheckAndMutateBuilderImpl(
org.apache.hadoop.hbase.client.AsyncTable.CheckAndMutateBuilder builder) {
this.builder = builder;
}
@Override
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
builder.qualifier(qualifier);
return this;
}
@Override
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
builder.qualifier(qualifier);
return this;
}
@Override
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
builder.timeRange(timeRange);
return this;
}
@Override
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
builder.timeRange(timeRange);
return this;
}
@Override
public CheckAndMutateBuilder ifNotExists() {
builder.ifNotExists();
return this;
}
@Override
public CheckAndMutateBuilder ifNotExists() {
builder.ifNotExists();
return this;
}
@Override
public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
builder.ifMatches(compareOp, value);
return this;
}
@Override
public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
builder.ifMatches(compareOp, value);
return this;
}
@Override
public boolean thenPut(Put put) throws IOException {
return FutureUtils.get(builder.thenPut(put));
}
@Override
public boolean thenPut(Put put) throws IOException {
return FutureUtils.get(builder.thenPut(put));
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
return FutureUtils.get(builder.thenDelete(delete));
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
return FutureUtils.get(builder.thenDelete(delete));
}
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
return FutureUtils.get(builder.thenMutate(mutation));
}
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
return FutureUtils.get(builder.thenMutate(mutation));
}
};
}
@Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new CheckAndMutateBuilderImpl(table.checkAndMutate(row, family));
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return new CheckAndMutateWithFilterBuilder() {
private final AsyncTable.CheckAndMutateWithFilterBuilder builder =
table.checkAndMutate(row, filter);
@Override
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
builder.timeRange(timeRange);
return this;
}
@Override
public boolean thenPut(Put put) throws IOException {
return FutureUtils.get(builder.thenPut(put));
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
return FutureUtils.get(builder.thenDelete(delete));
}
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
return FutureUtils.get(builder.thenMutate(mutation));
}
};
}
@Override

View File

@ -30,6 +30,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@ -53,7 +54,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -187,71 +189,51 @@ public final class RequestConverter {
/**
* Create a protocol buffer MutateRequest for a conditioned put
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param comparator
* @param compareType
* @param put
* @return a mutate request
* @throws IOException
*/
public static MutateRequest buildMutateRequest(
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final ByteArrayComparable comparator,
final CompareType compareType, TimeRange timeRange, final Put put) throws IOException {
return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
, put, MutationType.PUT);
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Put put) throws IOException {
return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
put, MutationType.PUT);
}
/**
* Create a protocol buffer MutateRequest for a conditioned delete
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param comparator
* @param compareType
* @param delete
* @return a mutate request
* @throws IOException
*/
public static MutateRequest buildMutateRequest(
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final ByteArrayComparable comparator,
final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException {
return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
, delete, MutationType.DELETE);
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Delete delete) throws IOException {
return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
delete, MutationType.DELETE);
}
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator,
final CompareType compareType, TimeRange timeRange, final Mutation mutation,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
final Filter filter, final TimeRange timeRange, final Mutation mutation,
final MutationType type) throws IOException {
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setMutation(ProtobufUtil.toMutation(type, mutation))
.setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
/**
* Create a protocol buffer MutateRequest for conditioned row mutations
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param comparator
* @param compareType
* @param rowMutations
* @return a mutate request
* @throws IOException
*/
public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
final byte[] row, final byte[] family, final byte[] qualifier,
final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange,
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
final RowMutations rowMutations) throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
@ -259,7 +241,7 @@ public final class RequestConverter {
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
MutationType mutateType;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
@ -275,7 +257,7 @@ public final class RequestConverter {
builder.addAction(actionBuilder.build());
}
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
.setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
@ -918,25 +900,26 @@ public final class RequestConverter {
/**
* Create a protocol buffer Condition
*
* @param row
* @param family
* @param qualifier
* @param comparator
* @param compareType
* @return a Condition
* @throws IOException
*/
public static Condition buildCondition(final byte[] row, final byte[] family,
final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType,
final TimeRange timeRange) {
return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row))
.setFamily(UnsafeByteOperations.unsafeWrap(family))
.setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ?
HConstants.EMPTY_BYTE_ARRAY : qualifier))
.setComparator(ProtobufUtil.toComparator(comparator))
.setCompareType(compareType)
.setTimeRange(ProtobufUtil.toTimeRange(timeRange))
.build();
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange) throws IOException {
Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row));
if (filter != null) {
builder.setFilter(ProtobufUtil.toFilter(filter));
} else {
builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
.setQualifier(UnsafeByteOperations.unsafeWrap(
qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
.setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
.setCompareType(CompareType.valueOf(op.name()));
}
return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
}
/**

View File

@ -139,11 +139,12 @@ message GetResponse {
*/
message Condition {
required bytes row = 1;
required bytes family = 2;
required bytes qualifier = 3;
required CompareType compare_type = 4;
required Comparator comparator = 5;
optional bytes family = 2;
optional bytes qualifier = 3;
optional CompareType compare_type = 4;
optional Comparator comparator = 5;
optional TimeRange time_range = 6;
optional Filter filter = 7;
}

View File

@ -138,11 +138,12 @@ message GetResponse {
*/
message Condition {
required bytes row = 1;
required bytes family = 2;
required bytes qualifier = 3;
required CompareType compare_type = 4;
required Comparator comparator = 5;
optional bytes family = 2;
optional bytes qualifier = 3;
optional CompareType compare_type = 4;
optional Comparator comparator = 5;
optional TimeRange time_range = 6;
optional Filter filter = 7;
}

View File

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.rest.Constants;
@ -737,6 +739,11 @@ public class RemoteHTable implements Table {
return new CheckAndMutateBuilderImpl(row, family);
}
@Override
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
throw new NotImplementedException("Implement later");
}
@Override
public Result increment(Increment increment) throws IOException {
throw new IOException("Increment not supported");

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -511,9 +512,8 @@ public interface RegionObserver {
* @param op the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
* @param result
* @return the return value to return to client if bypassing default
* processing
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
*/
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
@ -521,6 +521,26 @@ public interface RegionObserver {
return result;
}
/**
* Called before checkAndPut.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
*/
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Put put, boolean result) throws IOException {
return result;
}
/**
* Called before checkAndPut but after acquiring rowlock.
* <p>
@ -540,9 +560,8 @@ public interface RegionObserver {
* @param op the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
* @param result
* @return the return value to return to client if bypassing default
* processing
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
*/
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
@ -550,6 +569,30 @@ public interface RegionObserver {
return result;
}
/**
* Called before checkAndPut but after acquiring rowlock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
* can lead to potential deadlock.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
*/
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, Filter filter, Put put, boolean result) throws IOException {
return result;
}
/**
* Called after checkAndPut
* <p>
@ -571,6 +614,23 @@ public interface RegionObserver {
return result;
}
/**
* Called after checkAndPut
* <p>
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @param result from the checkAndPut
* @return the possibly transformed return value to return to client
*/
default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Put put, boolean result) throws IOException {
return result;
}
/**
* Called before checkAndDelete.
* <p>
@ -586,7 +646,7 @@ public interface RegionObserver {
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
* @param result
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
*/
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
@ -595,6 +655,26 @@ public interface RegionObserver {
return result;
}
/**
* Called before checkAndDelete.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param row row to check
* @param filter column family
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
*/
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Delete delete, boolean result) throws IOException {
return result;
}
/**
* Called before checkAndDelete but after acquiring rowock.
* <p>
@ -614,7 +694,7 @@ public interface RegionObserver {
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
* @param result
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
*/
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
@ -623,6 +703,30 @@ public interface RegionObserver {
return result;
}
/**
* Called before checkAndDelete but after acquiring rowock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
* can lead to potential deadlock.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param row row to check
* @param filter filter
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
*/
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
return result;
}
/**
* Called after checkAndDelete
* <p>
@ -644,6 +748,23 @@ public interface RegionObserver {
return result;
}
/**
* Called after checkAndDelete
* <p>
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param row row to check
* @param filter filter
* @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete
* @return the possibly transformed returned value to return to client
*/
default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Delete delete, boolean result) throws IOException {
return result;
}
/**
* Called before Append.
* <p>

View File

@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HFileLink;
@ -162,8 +163,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
@ -4205,13 +4204,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
checkMutationType(mutation, row);
return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation);
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null,
mutation);
}
@Override
public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation)
throws IOException {
return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation);
}
@Override
public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null);
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null);
}
@Override
public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm)
throws IOException {
return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null);
}
/**
@ -4219,7 +4231,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* switches in the few places where there is deviation.
*/
private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange,
CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
RowMutations rowMutations, Mutation mutation)
throws IOException {
// Could do the below checks but seems wacky with two callers only. Just comment out for now.
@ -4233,8 +4245,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation();
try {
Get get = new Get(row);
checkFamily(family);
get.addColumn(family, qualifier);
if (family != null) {
checkFamily(family);
get.addColumn(family, qualifier);
}
if (filter != null) {
get.setFilter(filter);
}
if (timeRange != null) {
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
@ -4246,11 +4263,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Call coprocessor.
Boolean processed = null;
if (mutation instanceof Put) {
processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
qualifier, op, comparator, (Put)mutation);
if (filter != null) {
processed = this.getCoprocessorHost()
.preCheckAndPutAfterRowLock(row, filter, (Put) mutation);
} else {
processed = this.getCoprocessorHost()
.preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator,
(Put) mutation);
}
} else if (mutation instanceof Delete) {
processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
qualifier, op, comparator, (Delete)mutation);
if (filter != null) {
processed = this.getCoprocessorHost()
.preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation);
} else {
processed = this.getCoprocessorHost()
.preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator,
(Delete) mutation);
}
}
if (processed != null) {
return processed;
@ -4260,20 +4289,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Supposition is that now all changes are done under row locks, then when we go to read,
// we'll get the latest on this row.
List<Cell> result = get(get, false);
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
boolean matches = false;
long cellTs = 0;
if (result.isEmpty() && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
matches = true;
cellTs = result.get(0).getTimestamp();
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
cellTs = kv.getTimestamp();
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
matches = matches(op, compareResult);
if (filter != null) {
if (!result.isEmpty()) {
matches = true;
cellTs = result.get(0).getTimestamp();
}
} else {
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
if (result.isEmpty() && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
matches = true;
cellTs = result.get(0).getTimestamp();
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
cellTs = kv.getTimestamp();
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
matches = matches(op, compareResult);
}
}
// If matches put the new put or delete the new delete
if (matches) {
// We have acquired the row lock already. If the system clock is NOT monotonically

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@ -609,9 +610,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param cellScanner if non-null, the mutation data -- the Cell content.
*/
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder,
ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
int countOfCompleteMutation = 0;
try {
if (!region.getRegionInfo().isMetaRegion()) {
@ -654,7 +656,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.addResultOrException(
resultOrExceptionOrBuilder.build());
}
return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
if (filter != null) {
return region.checkAndRowMutate(row, filter, timeRange, rm);
} else {
return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
}
} finally {
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
// even if the malformed cells are not skipped.
@ -2775,18 +2782,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.getFamily().toByteArray();
byte[] qualifier = condition.getQualifier().toByteArray();
CompareOperator op =
CompareOperator.valueOf(condition.getCompareType().name());
ByteArrayComparable comparator =
ProtobufUtil.toComparator(condition.getComparator());
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier = condition.hasQualifier() ?
condition.getQualifier().toByteArray() : null;
CompareOperator op = condition.hasCompareType() ?
CompareOperator.valueOf(condition.getCompareType().name()) : null;
ByteArrayComparable comparator = condition.hasComparator() ?
ProtobufUtil.toComparator(condition.getComparator()) : null;
Filter filter = condition.hasFilter() ?
ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
processed =
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
qualifier, op, comparator, timeRange, regionActionResultBuilder,
qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
spaceQuotaEnforcement);
} else {
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
@ -2935,24 +2945,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.getFamily().toByteArray();
byte[] qualifier = condition.getQualifier().toByteArray();
CompareOperator compareOp =
CompareOperator.valueOf(condition.getCompareType().name());
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier = condition.hasQualifier() ?
condition.getQualifier().toByteArray() : null;
CompareOperator op = condition.hasCompareType() ?
CompareOperator.valueOf(condition.getCompareType().name()) : null;
ByteArrayComparable comparator = condition.hasComparator() ?
ProtobufUtil.toComparator(condition.getComparator()) : null;
Filter filter = condition.hasFilter() ?
ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
if (region.getCoprocessorHost() != null) {
processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
compareOp, comparator, put);
if (filter != null) {
processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
} else {
processed = region.getCoprocessorHost()
.preCheckAndPut(row, family, qualifier, op, comparator, put);
}
}
if (processed == null) {
boolean result = region.checkAndMutate(row, family,
qualifier, compareOp, comparator, timeRange, put);
boolean result;
if (filter != null) {
result = region.checkAndMutate(row, filter, timeRange, put);
} else {
result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
put);
}
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndPut(row, family,
qualifier, compareOp, comparator, put, result);
if (filter != null) {
result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
} else {
result = region.getCoprocessorHost()
.postCheckAndPut(row, family, qualifier, op, comparator, put, result);
}
}
processed = result;
}
@ -2969,23 +2996,42 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.getFamily().toByteArray();
byte[] qualifier = condition.getQualifier().toByteArray();
CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier = condition.hasQualifier() ?
condition.getQualifier().toByteArray() : null;
CompareOperator op = condition.hasCompareType() ?
CompareOperator.valueOf(condition.getCompareType().name()) : null;
ByteArrayComparable comparator = condition.hasComparator() ?
ProtobufUtil.toComparator(condition.getComparator()) : null;
Filter filter = condition.hasFilter() ?
ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
if (region.getCoprocessorHost() != null) {
processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
comparator, delete);
if (filter != null) {
processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
} else {
processed = region.getCoprocessorHost()
.preCheckAndDelete(row, family, qualifier, op, comparator, delete);
}
}
if (processed == null) {
boolean result = region.checkAndMutate(row, family,
qualifier, op, comparator, timeRange, delete);
boolean result;
if (filter != null) {
result = region.checkAndMutate(row, filter, timeRange, delete);
} else {
result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
delete);
}
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndDelete(row, family,
qualifier, op, comparator, delete, result);
if (filter != null) {
result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
result);
} else {
result = region.getCoprocessorHost()
.postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
}
}
processed = result;
}
@ -3036,7 +3082,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
break;
default:
break;
}
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.yetus.audience.InterfaceAudience;
@ -333,6 +334,31 @@ public interface Region extends ConfigurationObserver {
boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
/**
* Atomically checks if a row matches the filter and if it does, it performs the mutation. See
* checkAndRowMutate to do many checkAndPuts at a time on a single row.
* @param row to check
* @param filter the filter
* @param mutation data to put if check succeeds
* @return true if mutation was applied, false otherwise
*/
default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation)
throws IOException {
return checkAndMutate(row, filter, TimeRange.allTime(), mutation);
}
/**
* Atomically checks if a row value matches the filter and if it does, it performs the mutation.
* See checkAndRowMutate to do many checkAndPuts at a time on a single row.
* @param row to check
* @param filter the filter
* @param mutation data to put if check succeeds
* @param timeRange time range to check
* @return true if mutation was applied, false otherwise
*/
boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation)
throws IOException;
/**
* Atomically checks if a row/family/qualifier value matches the expected values and if it does,
* it performs the row mutations. If the passed value is null, the lack of column value
@ -370,6 +396,33 @@ public interface Region extends ConfigurationObserver {
ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations)
throws IOException;
/**
* Atomically checks if a row matches the filter and if it does, it performs the row mutations.
* Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
* time.
* @param row to check
* @param filter the filter
* @param mutations data to put if check succeeds
* @return true if mutations were applied, false otherwise
*/
default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations)
throws IOException {
return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations);
}
/**
* Atomically checks if a row matches the filter and if it does, it performs the row mutations.
* Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
* time.
* @param row to check
* @param filter the filter
* @param mutations data to put if check succeeds
* @param timeRange time range to check
* @return true if mutations were applied, false otherwise
*/
boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange,
RowMutations mutations) throws IOException;
/**
* Deletes the specified cells/row.
* @param delete

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -1079,6 +1080,31 @@ public class RegionCoprocessorHost
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
*/
public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put)
throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndPut(this, row, filter, put, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
@ -1088,7 +1114,7 @@ public class RegionCoprocessorHost
* @param comparator the comparator
* @param put data to put if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
* otherwise
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="Null is legit")
@ -1111,6 +1137,33 @@ public class RegionCoprocessorHost
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="Null is legit")
public Boolean preCheckAndPutAfterRowLock(
final byte[] row, final Filter filter, final Put put) throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult());
}
});
}
/**
* @param row row to check
* @param family column family
@ -1137,6 +1190,26 @@ public class RegionCoprocessorHost
});
}
/**
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @throws IOException e
*/
public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put,
boolean result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postCheckAndPut(this, row, filter, put, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
@ -1145,8 +1218,8 @@ public class RegionCoprocessorHost
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should be bypassed,
* or null otherwise
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
*/
public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOperator op,
@ -1168,6 +1241,31 @@ public class RegionCoprocessorHost
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param filter filter
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
*/
public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete)
throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndDelete(this, row, filter, delete, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
@ -1200,6 +1298,33 @@ public class RegionCoprocessorHost
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param filter filter
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should be bypassed,
* or null otherwise
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="Null is legit")
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter,
final Delete delete) throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult());
}
});
}
/**
* @param row row to check
* @param family column family
@ -1226,6 +1351,26 @@ public class RegionCoprocessorHost
});
}
/**
* @param row row to check
* @param filter filter
* @param delete delete to commit if check succeeds
* @throws IOException e
*/
public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete,
boolean result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postCheckAndDelete(this, row, filter, delete, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param append append object

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.Filter;
/**
* Can be overridden in UT if you only want to implement part of the methods in {@link AsyncTable}.
@ -105,6 +106,11 @@ public class DummyAsyncTable<C extends ScanResultConsumerBase> implements AsyncT
return null;
}
@Override
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return null;
}
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return null;

View File

@ -30,6 +30,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@ -41,10 +42,17 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.TimestampsFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -401,6 +409,201 @@ public class TestAsyncTable {
assertTrue(ok);
}
@Test
public void testCheckAndMutateWithSingleFilter() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put).get();
// Put with success
boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
// Delete with success
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
.get();
assertTrue(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
// Mutate with success
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.thenMutate(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
.get();
assertTrue(ok);
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
}
@Test
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put).get();
// Put with success
boolean ok = table.checkAndMutate(row, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(row, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("c"))
))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
// Delete with success
ok = table.checkAndMutate(row, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
.get();
assertTrue(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
// Mutate with success
ok = table.checkAndMutate(row, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenMutate(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
.get();
assertTrue(ok);
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
}
@Test
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
// Put with success
boolean ok = table.checkAndMutate(row, new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(100L))
))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
.get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(row, new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(101L))
))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@Test
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
.get();
// Put with success
boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 101))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
.get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 100))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@Test(expected = NullPointerException.class)
public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
getTable.get().checkAndMutate(row, FAMILY)
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
@Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();

View File

@ -18,14 +18,25 @@
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.TimestampsFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -182,4 +193,183 @@ public class TestCheckAndMutate {
}
}
@Test
public void testCheckAndMutateWithSingleFilter() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
// get row back and assert the values
getOneRowAndAssertAllExist(table);
// Put with success
boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY,
Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
assertTrue(ok);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
assertFalse(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
// Delete with success
ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")));
assertTrue(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
// Mutate with success
ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.thenMutate(new RowMutations(ROWKEY)
.add((Mutation) new Put(ROWKEY)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
assertTrue(ok);
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
}
}
@Test
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
// get row back and assert the values
getOneRowAndAssertAllExist(table);
// Put with success
boolean ok = table.checkAndMutate(ROWKEY, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
assertTrue(ok);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(ROWKEY, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("c"))
))
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
assertFalse(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
// Delete with success
ok = table.checkAndMutate(ROWKEY, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")));
assertTrue(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
// Mutate with success
ok = table.checkAndMutate(ROWKEY, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenMutate(new RowMutations(ROWKEY)
.add((Mutation) new Put(ROWKEY)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
assertTrue(ok);
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
}
}
@Test
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
try (Table table = createTable()) {
// Put with specifying the timestamp
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
// Put with success
boolean ok = table.checkAndMutate(ROWKEY, new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(100L))
))
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
assertTrue(ok);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(ROWKEY, new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(101L))
))
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
assertFalse(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
}
}
@Test
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
try (Table table = createTable()) {
// Put with specifying the timestamp
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
// Put with success
boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY,
Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 101))
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
assertTrue(ok);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 100))
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
assertFalse(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
}
}
@Test(expected = NullPointerException.class)
public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
try (Table table = createTable()) {
table.checkAndMutate(ROWKEY, FAMILY)
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
}
}

View File

@ -30,12 +30,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -234,8 +234,7 @@ public class TestMalformedCellFromClient {
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
ClientProtos.Condition condition = RequestConverter
.buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
HBaseProtos.CompareType.EQUAL, null);
.buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null);
for (Mutation mutation : rm.getMutations()) {
ClientProtos.MutationProto.MutationType mutateType = null;
if (mutation instanceof Put) {

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -99,11 +100,17 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPostIncrement = new AtomicInteger(0);
final AtomicInteger ctPostAppend = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndPut = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndPutWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndPutAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndPutWithFilterAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndPut = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndPutWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDeleteWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0);
@ -492,6 +499,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return true;
}
@Override
public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Put put, boolean result) throws IOException {
ctPreCheckAndPutWithFilter.incrementAndGet();
return true;
}
@Override
public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
@ -500,6 +514,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return true;
}
@Override
public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, Filter filter, Put put, boolean result) throws IOException {
ctPreCheckAndPutWithFilterAfterRowLock.incrementAndGet();
return true;
}
@Override
public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
@ -508,6 +529,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return true;
}
@Override
public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Put put, boolean result) throws IOException {
ctPostCheckAndPutWithFilter.incrementAndGet();
return true;
}
@Override
public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
@ -516,6 +544,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return true;
}
@Override
public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Delete delete, boolean result) throws IOException {
ctPreCheckAndDeleteWithFilter.incrementAndGet();
return true;
}
@Override
public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
@ -524,6 +559,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return true;
}
@Override
public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
ctPreCheckAndDeleteWithFilterAfterRowLock.incrementAndGet();
return true;
}
@Override
public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
@ -532,6 +574,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return true;
}
@Override
public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
Filter filter, Delete delete, boolean result) throws IOException {
ctPostCheckAndDeleteWithFilter.incrementAndGet();
return true;
}
@Override
public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
Append append) throws IOException {
@ -693,28 +742,52 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPostCloseRegionOperation.get();
}
public boolean hadPreCheckAndPut() {
return ctPreCheckAndPut.get() > 0;
public int getPreCheckAndPut() {
return ctPreCheckAndPut.get();
}
public boolean hadPreCheckAndPutAfterRowLock() {
return ctPreCheckAndPutAfterRowLock.get() > 0;
public int getPreCheckAndPutWithFilter() {
return ctPreCheckAndPutWithFilter.get();
}
public boolean hadPostCheckAndPut() {
return ctPostCheckAndPut.get() > 0;
public int getPreCheckAndPutAfterRowLock() {
return ctPreCheckAndPutAfterRowLock.get();
}
public boolean hadPreCheckAndDelete() {
return ctPreCheckAndDelete.get() > 0;
public int getPreCheckAndPutWithFilterAfterRowLock() {
return ctPreCheckAndPutWithFilterAfterRowLock.get();
}
public boolean hadPreCheckAndDeleteAfterRowLock() {
return ctPreCheckAndDeleteAfterRowLock.get() > 0;
public int getPostCheckAndPut() {
return ctPostCheckAndPut.get();
}
public boolean hadPostCheckAndDelete() {
return ctPostCheckAndDelete.get() > 0;
public int getPostCheckAndPutWithFilter() {
return ctPostCheckAndPutWithFilter.get();
}
public int getPreCheckAndDelete() {
return ctPreCheckAndDelete.get();
}
public int getPreCheckAndDeleteWithFilter() {
return ctPreCheckAndDeleteWithFilter.get();
}
public int getPreCheckAndDeleteAfterRowLock() {
return ctPreCheckAndDeleteAfterRowLock.get();
}
public int getPreCheckAndDeleteWithFilterAfterRowLock() {
return ctPreCheckAndDeleteWithFilterAfterRowLock.get();
}
public int getPostCheckAndDelete() {
return ctPostCheckAndDelete.get();
}
public int getPostCheckAndDeleteWithFilter() {
return ctPostCheckAndDeleteWithFilter.get();
}
public boolean hadPreIncrement() {

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@ -263,12 +265,26 @@ public class TestRegionObserverInterface {
p = new Put(Bytes.toBytes(0));
p.addColumn(A, A, A);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
tableName, new Boolean[] { false, false, false });
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
"getPostCheckAndPutWithFilter" },
tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
tableName, new Boolean[] { true, true, true });
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
"getPostCheckAndPutWithFilter" },
tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
table.checkAndMutate(Bytes.toBytes(0),
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
.thenPut(p);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
"getPostCheckAndPutWithFilter" },
tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
} finally {
util.deleteTable(tableName);
}
@ -285,14 +301,29 @@ public class TestRegionObserverInterface {
Delete d = new Delete(Bytes.toBytes(0));
table.delete(d);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
"hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
tableName, new Boolean[] { false, false, false });
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
"getPostCheckAndDeleteWithFilter" },
tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
"hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
tableName, new Boolean[] { true, true, true });
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
"getPostCheckAndDeleteWithFilter" },
tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
table.checkAndMutate(Bytes.toBytes(0),
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
.thenDelete(d);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
"getPostCheckAndDeleteWithFilter" },
tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
} finally {
util.deleteTable(tableName);
table.close();

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
/**
@ -218,6 +219,11 @@ public class RegionAsTable implements Table {
throw new UnsupportedOperationException();
}
@Override
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
throw new UnsupportedOperationException();
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
throw new UnsupportedOperationException();

View File

@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -2147,6 +2148,128 @@ public class TestHRegion {
assertEquals(0, r.size());
}
@Test
public void testCheckAndMutate_WithFilters() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
// Setting up region
this.region = initHRegion(tableName, method, CONF, FAMILY);
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
region.put(put);
// Put with success
boolean ok = region.checkAndMutate(row,
new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
),
new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
assertTrue(ok);
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = region.checkAndMutate(row,
new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("c"))
),
new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
assertFalse(ok);
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
// Delete with success
ok = region.checkAndMutate(row,
new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
),
new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")));
assertTrue(ok);
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
// Mutate with success
ok = region.checkAndRowMutate(row,
new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
),
new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
assertTrue(ok);
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
}
@Test
public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
// Setting up region
this.region = initHRegion(tableName, method, CONF, FAMILY);
// Put with specifying the timestamp
region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
// Put with success
boolean ok = region.checkAndMutate(row,
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
TimeRange.between(0, 101),
new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
assertTrue(ok);
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = region.checkAndMutate(row,
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
TimeRange.between(0, 100),
new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
assertFalse(ok);
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
// Mutate with success
ok = region.checkAndRowMutate(row,
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
TimeRange.between(0, 101),
new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
assertTrue(ok);
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
}
// ////////////////////////////////////////////////////////////////////////////
// Delete tests
// ////////////////////////////////////////////////////////////////////////////

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
@ -425,6 +426,11 @@ public class ThriftTable implements Table {
return new CheckAndMutateBuilderImpl(row, family);
}
@Override
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
throw new NotImplementedException("Implement later");
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);