From 22bf9a38c97f73bc1507c7de86af032500ead1ac Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Mon, 10 Aug 2020 18:57:17 +0900 Subject: [PATCH] HBASE-24680 Refactor the checkAndMutate code on the server side (#2184) Signed-off-by: Duo Zhang Signed-off-by: Josh Elser --- .../hadoop/hbase/client/CheckAndMutate.java | 11 +- .../hbase/shaded/protobuf/ProtobufUtil.java | 71 +++ .../MetricsRegionServerSource.java | 7 + .../MetricsRegionServerSourceImpl.java | 7 + .../hbase/coprocessor/RegionObserver.java | 189 +++++++ .../hadoop/hbase/regionserver/HRegion.java | 133 +++-- .../regionserver/MetricsRegionServer.java | 4 + .../hbase/regionserver/RSRpcServices.java | 426 ++++++--------- .../hadoop/hbase/regionserver/Region.java | 44 ++ .../regionserver/RegionCoprocessorHost.java | 313 +---------- .../coprocessor/SimpleRegionObserver.java | 39 ++ .../TestRegionObserverInterface.java | 73 ++- .../hbase/regionserver/TestHRegion.java | 512 +++++++++++++++++- .../regionserver/TestMetricsRegionServer.java | 3 +- 14 files changed, 1213 insertions(+), 619 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java index 5fce3e2eacb..26eb23d1040 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java @@ -214,7 +214,7 @@ public final class CheckAndMutate extends Mutation { this.op = op; this.value = value; this.filter = null; - this.timeRange = timeRange; + this.timeRange = timeRange != null ? timeRange : TimeRange.allTime(); this.action = action; } @@ -225,7 +225,7 @@ public final class CheckAndMutate extends Mutation { this.op = null; this.value = null; this.filter = filter; - this.timeRange = timeRange; + this.timeRange = timeRange != null ? timeRange : TimeRange.allTime(); this.action = action; } @@ -264,6 +264,13 @@ public final class CheckAndMutate extends Mutation { return filter; } + /** + * @return whether this has a filter or not + */ + public boolean hasFilter() { + return filter != null; + } + /** * @return the time range to check */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 09db4466c28..c82243a9837 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ExtendedCellBuilder; import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; @@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.ClientUtil; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -84,6 +86,7 @@ import org.apache.hadoop.hbase.client.RegionLoadStats; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionStatesCount; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.SlowLogParams; import org.apache.hadoop.hbase.client.SnapshotDescription; @@ -3469,4 +3472,72 @@ public final class ProtobufUtil { return clearSlowLogResponses.getIsCleaned(); } + public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition, + MutationProto mutation, CellScanner cellScanner) throws IOException { + byte[] row = condition.getRow().toByteArray(); + CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row); + Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null; + if (filter != null) { + builder.ifMatches(filter); + } else { + builder.ifMatches(condition.getFamily().toByteArray(), + condition.getQualifier().toByteArray(), + CompareOperator.valueOf(condition.getCompareType().name()), + ProtobufUtil.toComparator(condition.getComparator()).getValue()); + } + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); + builder.timeRange(timeRange); + + try { + MutationType type = mutation.getMutateType(); + switch (type) { + case PUT: + return builder.build(ProtobufUtil.toPut(mutation, cellScanner)); + case DELETE: + return builder.build(ProtobufUtil.toDelete(mutation, cellScanner)); + default: + throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); + } + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException(e.getMessage()); + } + } + + public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition, + List mutations) throws IOException { + assert mutations.size() > 0; + byte[] row = condition.getRow().toByteArray(); + CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row); + Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null; + if (filter != null) { + builder.ifMatches(filter); + } else { + builder.ifMatches(condition.getFamily().toByteArray(), + condition.getQualifier().toByteArray(), + CompareOperator.valueOf(condition.getCompareType().name()), + ProtobufUtil.toComparator(condition.getComparator()).getValue()); + } + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); + builder.timeRange(timeRange); + + try { + if (mutations.size() == 1) { + Mutation m = mutations.get(0); + if (m instanceof Put) { + return builder.build((Put) m); + } else if (m instanceof Delete) { + return builder.build((Delete) m); + } else { + throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0) + .getClass().getSimpleName().toUpperCase()); + } + } else { + return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations)); + } + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException(e.getMessage()); + } + } } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index 958495ad7d4..ce9319ecbe7 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -86,6 +86,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo */ void updateCheckAndPut(long t); + /** + * Update checkAndMutate histogram + * @param t time it took + */ + void updateCheckAndMutate(long t); + /** * Update the Get time histogram . * @@ -393,6 +399,7 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo String DELETE_KEY = "delete"; String CHECK_AND_DELETE_KEY = "checkAndDelete"; String CHECK_AND_PUT_KEY = "checkAndPut"; + String CHECK_AND_MUTATE_KEY = "checkAndMutate"; String DELETE_BATCH_KEY = "deleteBatch"; String GET_SIZE_KEY = "getSize"; String GET_KEY = "get"; diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 4af8bece63e..e8d7f365625 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -42,6 +42,7 @@ public class MetricsRegionServerSourceImpl private final MetricHistogram deleteBatchHisto; private final MetricHistogram checkAndDeleteHisto; private final MetricHistogram checkAndPutHisto; + private final MetricHistogram checkAndMutateHisto; private final MetricHistogram getHisto; private final MetricHistogram incrementHisto; private final MetricHistogram appendHisto; @@ -112,6 +113,7 @@ public class MetricsRegionServerSourceImpl deleteBatchHisto = getMetricsRegistry().newTimeHistogram(DELETE_BATCH_KEY); checkAndDeleteHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_DELETE_KEY); checkAndPutHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_PUT_KEY); + checkAndMutateHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_MUTATE_KEY); getHisto = getMetricsRegistry().newTimeHistogram(GET_KEY); slowGet = getMetricsRegistry().newCounter(SLOW_GET_KEY, SLOW_GET_DESC, 0L); @@ -614,6 +616,11 @@ public class MetricsRegionServerSourceImpl checkAndPutHisto.add(t); } + @Override + public void updateCheckAndMutate(long t) { + checkAndMutateHisto.add(t); + } + @Override public void updatePutBatch(long t) { putBatchHisto.add(t); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 05071fc5c4e..0bc0631a9ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; @@ -514,7 +517,11 @@ public interface RegionObserver { * @param put data to put if check succeeds * @param result the default value of the result * @return the return value to return to client if bypassing default processing + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead. */ + @Deprecated default boolean preCheckAndPut(ObserverContext c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, boolean result) throws IOException { @@ -535,7 +542,11 @@ public interface RegionObserver { * @param put data to put if check succeeds * @param result the default value of the result * @return the return value to return to client if bypassing default processing + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead. */ + @Deprecated default boolean preCheckAndPut(ObserverContext c, byte[] row, Filter filter, Put put, boolean result) throws IOException { return result; @@ -562,7 +573,12 @@ public interface RegionObserver { * @param put data to put if check succeeds * @param result the default value of the result * @return the return value to return to client if bypassing default processing + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)} + * instead. */ + @Deprecated default boolean preCheckAndPutAfterRowLock(ObserverContext c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, boolean result) throws IOException { @@ -587,7 +603,12 @@ public interface RegionObserver { * @param put data to put if check succeeds * @param result the default value of the result * @return the return value to return to client if bypassing default processing + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)} + * instead. */ + @Deprecated default boolean preCheckAndPutAfterRowLock(ObserverContext c, byte[] row, Filter filter, Put put, boolean result) throws IOException { return result; @@ -607,7 +628,11 @@ public interface RegionObserver { * @param put data to put if check succeeds * @param result from the checkAndPut * @return the possibly transformed return value to return to client + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead. */ + @Deprecated default boolean postCheckAndPut(ObserverContext c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, boolean result) throws IOException { @@ -625,7 +650,11 @@ public interface RegionObserver { * @param put data to put if check succeeds * @param result from the checkAndPut * @return the possibly transformed return value to return to client + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead. */ + @Deprecated default boolean postCheckAndPut(ObserverContext c, byte[] row, Filter filter, Put put, boolean result) throws IOException { return result; @@ -648,7 +677,11 @@ public interface RegionObserver { * @param delete delete to commit if check succeeds * @param result the default value of the result * @return the value to return to client if bypassing default processing + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead. */ + @Deprecated default boolean preCheckAndDelete(ObserverContext c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Delete delete, boolean result) throws IOException { @@ -669,7 +702,11 @@ public interface RegionObserver { * @param delete delete to commit if check succeeds * @param result the default value of the result * @return the value to return to client if bypassing default processing + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead. */ + @Deprecated default boolean preCheckAndDelete(ObserverContext c, byte[] row, Filter filter, Delete delete, boolean result) throws IOException { return result; @@ -696,7 +733,12 @@ public interface RegionObserver { * @param delete delete to commit if check succeeds * @param result the default value of the result * @return the value to return to client if bypassing default processing + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)} + * instead. */ + @Deprecated default boolean preCheckAndDeleteAfterRowLock(ObserverContext c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Delete delete, boolean result) throws IOException { @@ -721,7 +763,12 @@ public interface RegionObserver { * @param delete delete to commit if check succeeds * @param result the default value of the result * @return the value to return to client if bypassing default processing + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)} + * instead. */ + @Deprecated default boolean preCheckAndDeleteAfterRowLock(ObserverContext c, byte[] row, Filter filter, Delete delete, boolean result) throws IOException { return result; @@ -741,7 +788,11 @@ public interface RegionObserver { * @param delete delete to commit if check succeeds * @param result from the CheckAndDelete * @return the possibly transformed returned value to return to client + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead. */ + @Deprecated default boolean postCheckAndDelete(ObserverContext c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Delete delete, boolean result) throws IOException { @@ -759,12 +810,150 @@ public interface RegionObserver { * @param delete delete to commit if check succeeds * @param result from the CheckAndDelete * @return the possibly transformed returned value to return to client + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead. */ + @Deprecated default boolean postCheckAndDelete(ObserverContext c, byte[] row, Filter filter, Delete delete, boolean result) throws IOException { return result; } + /** + * Called before checkAndMutate + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in actions beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param checkAndMutate the CheckAndMutate object + * @param result the default value of the result + * @return the return value to return to client if bypassing default processing + * @throws IOException if an error occurred on the coprocessor + */ + default CheckAndMutateResult preCheckAndMutate(ObserverContext c, + CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException { + if (checkAndMutate.getAction() instanceof Put) { + boolean success; + if (checkAndMutate.hasFilter()) { + success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFilter(), + (Put) checkAndMutate.getAction(), result.isSuccess()); + } else { + success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFamily(), + checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), + new BinaryComparator(checkAndMutate.getValue()), (Put) checkAndMutate.getAction(), + result.isSuccess()); + } + return new CheckAndMutateResult(success, null); + } else if (checkAndMutate.getAction() instanceof Delete) { + boolean success; + if (checkAndMutate.hasFilter()) { + success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFilter(), + (Delete) checkAndMutate.getAction(), result.isSuccess()); + } else { + success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFamily(), + checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), + new BinaryComparator(checkAndMutate.getValue()), (Delete) checkAndMutate.getAction(), + result.isSuccess()); + } + return new CheckAndMutateResult(success, null); + } + return result; + } + + /** + * Called before checkAndDelete but after acquiring rowlock. + *

+ * Note: Caution to be taken for not doing any long time operation in this hook. + * Row will be locked for longer time. Trying to acquire lock on another row, within this, + * can lead to potential deadlock. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions. + * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors. + *

+ * Note: Do not retain references to any Cells in actions beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param checkAndMutate the CheckAndMutate object + * @param result the default value of the result + * @return the value to return to client if bypassing default processing + * @throws IOException if an error occurred on the coprocessor + */ + default CheckAndMutateResult preCheckAndMutateAfterRowLock( + ObserverContext c, CheckAndMutate checkAndMutate, + CheckAndMutateResult result) throws IOException { + if (checkAndMutate.getAction() instanceof Put) { + boolean success; + if (checkAndMutate.hasFilter()) { + success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(), + checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess()); + } else { + success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()), + (Put) checkAndMutate.getAction(), result.isSuccess()); + } + return new CheckAndMutateResult(success, null); + } else if (checkAndMutate.getAction() instanceof Delete) { + boolean success; + if (checkAndMutate.hasFilter()) { + success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(), + checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess()); + } else { + success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()), + (Delete) checkAndMutate.getAction(), result.isSuccess()); + } + return new CheckAndMutateResult(success, null); + } + return result; + } + + /** + * Called after checkAndMutate + *

+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation. + * If need a Cell reference for later use, copy the cell and use that. + * @param c the environment provided by the region server + * @param checkAndMutate the CheckAndMutate object + * @param result from the checkAndMutate + * @return the possibly transformed returned value to return to client + * @throws IOException if an error occurred on the coprocessor + */ + default CheckAndMutateResult postCheckAndMutate(ObserverContext c, + CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException { + if (checkAndMutate.getAction() instanceof Put) { + boolean success; + if (checkAndMutate.hasFilter()) { + success = postCheckAndPut(c, checkAndMutate.getRow(), + checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess()); + } else { + success = postCheckAndPut(c, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()), + (Put) checkAndMutate.getAction(), result.isSuccess()); + } + return new CheckAndMutateResult(success, null); + } else if (checkAndMutate.getAction() instanceof Delete) { + boolean success; + if (checkAndMutate.hasFilter()) { + success = postCheckAndDelete(c, checkAndMutate.getRow(), + checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess()); + } else { + success = postCheckAndDelete(c, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()), + (Delete) checkAndMutate.getAction(), result.isSuccess()); + } + return new CheckAndMutateResult(success, null); + } + return result; + } + /** * Called before Append. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f62da4cdf8a..2cffa32c57c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -103,6 +103,8 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Delete; @@ -129,6 +131,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterWrapper; @@ -4249,43 +4252,103 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override + @Deprecated public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException { - return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null, - mutation); + CheckAndMutate checkAndMutate; + try { + CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row) + .ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange); + if (mutation instanceof Put) { + checkAndMutate = builder.build((Put) mutation); + } else if (mutation instanceof Delete) { + checkAndMutate = builder.build((Delete) mutation); + } else { + throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass() + .getSimpleName().toUpperCase()); + } + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException(e.getMessage()); + } + return checkAndMutate(checkAndMutate).isSuccess(); } @Override + @Deprecated public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation) throws IOException { - return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation); + CheckAndMutate checkAndMutate; + try { + CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row).ifMatches(filter) + .timeRange(timeRange); + if (mutation instanceof Put) { + checkAndMutate = builder.build((Put) mutation); + } else if (mutation instanceof Delete) { + checkAndMutate = builder.build((Delete) mutation); + } else { + throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass() + .getSimpleName().toUpperCase()); + } + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException(e.getMessage()); + } + return checkAndMutate(checkAndMutate).isSuccess(); } @Override + @Deprecated public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException { - return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null); + CheckAndMutate checkAndMutate; + try { + checkAndMutate = CheckAndMutate.newBuilder(row) + .ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange).build(rm); + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException(e.getMessage()); + } + return checkAndMutate(checkAndMutate).isSuccess(); } @Override + @Deprecated public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm) throws IOException { - return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null); + CheckAndMutate checkAndMutate; + try { + checkAndMutate = CheckAndMutate.newBuilder(row).ifMatches(filter).timeRange(timeRange) + .build(rm); + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException(e.getMessage()); + } + return checkAndMutate(checkAndMutate).isSuccess(); } - /** - * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has - * switches in the few places where there is deviation. - */ - private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange, - RowMutations rowMutations, Mutation mutation) - throws IOException { - // Could do the below checks but seems wacky with two callers only. Just comment out for now. - // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't - // need these commented out checks. - // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null"); - // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set"); + @Override + public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + byte[] row = checkAndMutate.getRow(); + Filter filter = null; + byte[] family = null; + byte[] qualifier = null; + CompareOperator op = null; + ByteArrayComparable comparator = null; + if (checkAndMutate.hasFilter()) { + filter = checkAndMutate.getFilter(); + } else { + family = checkAndMutate.getFamily(); + qualifier = checkAndMutate.getQualifier(); + op = checkAndMutate.getCompareOp(); + comparator = new BinaryComparator(checkAndMutate.getValue()); + } + TimeRange timeRange = checkAndMutate.getTimeRange(); + + Mutation mutation = null; + RowMutations rowMutations = null; + if (checkAndMutate.getAction() instanceof Mutation) { + mutation = (Mutation) checkAndMutate.getAction(); + } else { + rowMutations = (RowMutations) checkAndMutate.getAction(); + } + if (mutation != null) { checkMutationType(mutation); checkRow(mutation, row); @@ -4312,32 +4375,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkRow(row, "doCheckAndRowMutate"); RowLock rowLock = getRowLockInternal(get.getRow(), false, null); try { - if (mutation != null && this.getCoprocessorHost() != null) { - // Call coprocessor. - Boolean processed = null; - if (mutation instanceof Put) { - if (filter != null) { - processed = this.getCoprocessorHost() - .preCheckAndPutAfterRowLock(row, filter, (Put) mutation); - } else { - processed = this.getCoprocessorHost() - .preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator, - (Put) mutation); - } - } else if (mutation instanceof Delete) { - if (filter != null) { - processed = this.getCoprocessorHost() - .preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation); - } else { - processed = this.getCoprocessorHost() - .preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator, - (Delete) mutation); - } - } - if (processed != null) { - return processed; + if (this.getCoprocessorHost() != null) { + CheckAndMutateResult result = + getCoprocessorHost().preCheckAndMutateAfterRowLock(checkAndMutate); + if (result != null) { + return result; } } + // NOTE: We used to wait here until mvcc caught up: mvcc.await(); // Supposition is that now all changes are done under row locks, then when we go to read, // we'll get the latest on this row. @@ -4395,10 +4440,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi mutateRow(rowMutations); } this.checkAndMutateChecksPassed.increment(); - return true; + return new CheckAndMutateResult(true, null); } this.checkAndMutateChecksFailed.increment(); - return false; + return new CheckAndMutateResult(false, null); } finally { rowLock.release(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index a66ae00dc28..afb249dad65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -153,6 +153,10 @@ public class MetricsRegionServer { serverSource.updateCheckAndPut(t); } + public void updateCheckAndMutate(long t) { + serverSource.updateCheckAndMutate(t); + } + public void updateGet(TableName tn, long t) { if (tableMetrics != null && tn != null) { tableMetrics.updateGet(tn, t); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index bc00ab2e089..7fe7f194ad0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseIOException; @@ -67,6 +66,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -78,7 +79,6 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.VersionInfoUtil; @@ -87,10 +87,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; -import org.apache.hadoop.hbase.filter.ByteArrayComparable; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ByteBuffAllocator; -import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -620,62 +617,55 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - /** - * Mutate a list of rows atomically. - * @param cellScanner if non-null, the mutation data -- the Cell content. - */ - private boolean checkAndRowMutate(final HRegion region, final List actions, - final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange, - RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement) + private CheckAndMutateResult checkAndMutate(HRegion region, List actions, + CellScanner cellScanner, Condition condition,ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { int countOfCompleteMutation = 0; try { if (!region.getRegionInfo().isMetaRegion()) { regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } - RowMutations rm = null; - int i = 0; - ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = - ClientProtos.ResultOrException.newBuilder(); + List mutations = new ArrayList<>(); for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); } MutationType type = action.getMutation().getMutateType(); - if (rm == null) { - rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); - } switch (type) { case PUT: Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); ++countOfCompleteMutation; checkCellSizeLimit(region, put); spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - rm.add(put); + mutations.add(put); break; case DELETE: Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); ++countOfCompleteMutation; spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); - rm.add(del); + mutations.add(del); break; default: throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } - // To unify the response format with doNonAtomicRegionMutation and read through client's - // AsyncProcess we have to add an empty result instance per operation - resultOrExceptionOrBuilder.clear(); - resultOrExceptionOrBuilder.setIndex(i++); - builder.addResultOrException( - resultOrExceptionOrBuilder.build()); } - if (filter != null) { - return region.checkAndRowMutate(row, filter, timeRange, rm); + if (mutations.size() == 0) { + return new CheckAndMutateResult(true, null); } else { - return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); + CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); + CheckAndMutateResult result = null; + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); + } + if (result == null) { + result = region.checkAndMutate(checkAndMutate); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); + } + } + return result; } } finally { // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner @@ -2804,26 +2794,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } try { - Condition condition = request.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; - byte[] qualifier = - condition.hasQualifier() ? condition.getQualifier().toByteArray() : null; - CompareOperator op = condition.hasCompareType() ? - CompareOperator.valueOf(condition.getCompareType().name()) : - null; - ByteArrayComparable comparator = condition.hasComparator() ? - ProtobufUtil.toComparator(condition.getComparator()) : null; - Filter filter = - condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null; - TimeRange timeRange = condition.hasTimeRange() ? - ProtobufUtil.toTimeRange(condition.getTimeRange()) : - TimeRange.allTime(); - boolean processed = - checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, - qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, - spaceQuotaEnforcement); - responseBuilder.setProcessed(processed); + CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), + cellScanner, request.getCondition(), spaceQuotaEnforcement); + responseBuilder.setProcessed(result.isSuccess()); + ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = + ClientProtos.ResultOrException.newBuilder(); + for (int i = 0; i < regionAction.getActionCount(); i++) { + // To unify the response format with doNonAtomicRegionMutation and read through + // client's AsyncProcess we have to add an empty result instance per operation + resultOrExceptionOrBuilder.clear(); + resultOrExceptionOrBuilder.setIndex(i); + regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); + } } catch (IOException e) { rpcServer.getMetrics().exception(e); // As it's an atomic operation with a condition, we may expect it's a global failure. @@ -2865,80 +2847,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { if (regionAction.hasCondition()) { try { - Condition condition = regionAction.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; - byte[] qualifier = condition.hasQualifier() ? - condition.getQualifier().toByteArray() : null; - CompareOperator op = condition.hasCompareType() ? - CompareOperator.valueOf(condition.getCompareType().name()) : null; - ByteArrayComparable comparator = condition.hasComparator() ? - ProtobufUtil.toComparator(condition.getComparator()) : null; - Filter filter = condition.hasFilter() ? - ProtobufUtil.toFilter(condition.getFilter()) : null; - TimeRange timeRange = condition.hasTimeRange() ? - ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); - - boolean processed; - if (regionAction.hasAtomic() && regionAction.getAtomic()) { - // RowMutations - processed = - checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, - qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, - spaceQuotaEnforcement); - } else { - if (regionAction.getActionList().isEmpty()) { - // If the region action list is empty, do nothing. - regionActionResultBuilder.setProcessed(true); + CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), + cellScanner, regionAction.getCondition(), spaceQuotaEnforcement); + regionActionResultBuilder.setProcessed(result.isSuccess()); + ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = + ClientProtos.ResultOrException.newBuilder(); + for (int i = 0; i < regionAction.getActionCount(); i++) { + if (i == 0 && result.getResult() != null) { + resultOrExceptionOrBuilder.setIndex(i); + regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder + .setResult(ProtobufUtil.toResult(result.getResult())).build()); continue; } - Action action = regionAction.getAction(0); - if (action.hasGet()) { - throw new DoNotRetryIOException("CheckAndMutate doesn't support GET=" - + action.getGet()); - } - MutationProto mutation = action.getMutation(); - switch (mutation.getMutateType()) { - case PUT: - Put put = ProtobufUtil.toPut(mutation, cellScanner); - checkCellSizeLimit(region, put); - // Throws an exception when violated - spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - quota.addMutation(put); - - if (filter != null) { - processed = region.checkAndMutate(row, filter, timeRange, put); - } else { - processed = region.checkAndMutate(row, family, qualifier, op, comparator, - timeRange, put); - } - break; - - case DELETE: - Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); - checkCellSizeLimit(region, delete); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); - quota.addMutation(delete); - - if (filter != null) { - processed = region.checkAndMutate(row, filter, timeRange, delete); - } else { - processed = region.checkAndMutate(row, family, qualifier, op, comparator, - timeRange, delete); - } - break; - - default: - throw new DoNotRetryIOException("CheckAndMutate doesn't support " - + mutation.getMutateType()); - } - // To unify the response format with doNonAtomicRegionMutation and read through // client's AsyncProcess we have to add an empty result instance per operation - regionActionResultBuilder.addResultOrException( - ClientProtos.ResultOrException.newBuilder().setIndex(0).build()); + resultOrExceptionOrBuilder.clear(); + resultOrExceptionOrBuilder.setIndex(i); + regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); } - regionActionResultBuilder.setProcessed(processed); } catch (IOException e) { rpcServer.getMetrics().exception(e); // As it's an atomic operation with a condition, we may expect it's a global failure. @@ -3039,10 +2965,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, CellScanner cellScanner = controller != null ? controller.cellScanner() : null; OperationQuota quota = null; RpcCallContext context = RpcServer.getCurrentCall().orElse(null); - ActivePolicyEnforcement spaceQuotaEnforcement = null; - MutationType type = null; - HRegion region = null; - long before = EnvironmentEdgeManager.currentTime(); // Clear scanner so we are not holding on to reference across call. if (controller != null) { controller.setCellScanner(null); @@ -3051,143 +2973,58 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); requestCount.increment(); rpcMutateRequestCount.increment(); - region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); MutateResponse.Builder builder = MutateResponse.newBuilder(); MutationProto mutation = request.getMutation(); if (!region.getRegionInfo().isMetaRegion()) { regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; - Result r = null; - Boolean processed = null; - type = mutation.getMutateType(); - quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); - spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); + ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager() + .getActiveEnforcements(); - switch (type) { - case APPEND: - // TODO: this doesn't actually check anything. - r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); - break; - case INCREMENT: - // TODO: this doesn't actually check anything. - r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); - break; - case PUT: - Put put = ProtobufUtil.toPut(mutation, cellScanner); - checkCellSizeLimit(region, put); - // Throws an exception when violated - spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - quota.addMutation(put); - if (request.hasCondition()) { - Condition condition = request.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; - byte[] qualifier = condition.hasQualifier() ? - condition.getQualifier().toByteArray() : null; - CompareOperator op = condition.hasCompareType() ? - CompareOperator.valueOf(condition.getCompareType().name()) : null; - ByteArrayComparable comparator = condition.hasComparator() ? - ProtobufUtil.toComparator(condition.getComparator()) : null; - Filter filter = condition.hasFilter() ? - ProtobufUtil.toFilter(condition.getFilter()) : null; - TimeRange timeRange = condition.hasTimeRange() ? - ProtobufUtil.toTimeRange(condition.getTimeRange()) : - TimeRange.allTime(); - if (region.getCoprocessorHost() != null) { - if (filter != null) { - processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put); - } else { - processed = region.getCoprocessorHost() - .preCheckAndPut(row, family, qualifier, op, comparator, put); - } - } - if (processed == null) { - boolean result; - if (filter != null) { - result = region.checkAndMutate(row, filter, timeRange, put); - } else { - result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange, - put); - } - if (region.getCoprocessorHost() != null) { - if (filter != null) { - result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result); - } else { - result = region.getCoprocessorHost() - .postCheckAndPut(row, family, qualifier, op, comparator, put, result); - } - } - processed = result; - } - } else { - region.put(put); + if (request.hasCondition()) { + CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, + request.getCondition(), spaceQuotaEnforcement); + builder.setProcessed(result.isSuccess()); + boolean clientCellBlockSupported = isClientCellBlockSupport(context); + addResult(builder, result.getResult(), controller, clientCellBlockSupported); + if (clientCellBlockSupported) { + addSize(context, result.getResult(), null); + } + } else { + Result r = null; + Boolean processed = null; + MutationType type = mutation.getMutateType(); + switch (type) { + case APPEND: + // TODO: this doesn't actually check anything. + r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); + break; + case INCREMENT: + // TODO: this doesn't actually check anything. + r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); + break; + case PUT: + put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); processed = Boolean.TRUE; - } - break; - case DELETE: - Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); - checkCellSizeLimit(region, delete); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); - quota.addMutation(delete); - if (request.hasCondition()) { - Condition condition = request.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; - byte[] qualifier = condition.hasQualifier() ? - condition.getQualifier().toByteArray() : null; - CompareOperator op = condition.hasCompareType() ? - CompareOperator.valueOf(condition.getCompareType().name()) : null; - ByteArrayComparable comparator = condition.hasComparator() ? - ProtobufUtil.toComparator(condition.getComparator()) : null; - Filter filter = condition.hasFilter() ? - ProtobufUtil.toFilter(condition.getFilter()) : null; - TimeRange timeRange = condition.hasTimeRange() ? - ProtobufUtil.toTimeRange(condition.getTimeRange()) : - TimeRange.allTime(); - if (region.getCoprocessorHost() != null) { - if (filter != null) { - processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete); - } else { - processed = region.getCoprocessorHost() - .preCheckAndDelete(row, family, qualifier, op, comparator, delete); - } - } - if (processed == null) { - boolean result; - if (filter != null) { - result = region.checkAndMutate(row, filter, timeRange, delete); - } else { - result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange, - delete); - } - if (region.getCoprocessorHost() != null) { - if (filter != null) { - result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete, - result); - } else { - result = region.getCoprocessorHost() - .postCheckAndDelete(row, family, qualifier, op, comparator, delete, result); - } - } - processed = result; - } - } else { - region.delete(delete); + break; + case DELETE: + delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); processed = Boolean.TRUE; - } - break; - default: - throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); - } - if (processed != null) { - builder.setProcessed(processed.booleanValue()); - } - boolean clientCellBlockSupported = isClientCellBlockSupport(context); - addResult(builder, r, controller, clientCellBlockSupported); - if (clientCellBlockSupported) { - addSize(context, r, null); + break; + default: + throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); + } + if (processed != null) { + builder.setProcessed(processed); + } + boolean clientCellBlockSupported = isClientCellBlockSupport(context); + addResult(builder, r, controller, clientCellBlockSupported); + if (clientCellBlockSupported) { + addSize(context, r, null); + } } return builder.build(); } catch (IOException ie) { @@ -3197,32 +3034,79 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (quota != null) { quota.close(); } - // Update metrics - final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); - if (metricsRegionServer != null && type != null) { - long after = EnvironmentEdgeManager.currentTime(); - switch (type) { - case DELETE: - if (request.hasCondition()) { - metricsRegionServer.updateCheckAndDelete(after - before); - } else { - metricsRegionServer.updateDelete( - region == null ? null : region.getRegionInfo().getTable(), after - before); - } - break; + } + } + + private void put(HRegion region, OperationQuota quota, MutationProto mutation, + CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { + long before = EnvironmentEdgeManager.currentTime(); + Put put = ProtobufUtil.toPut(mutation, cellScanner); + checkCellSizeLimit(region, put); + spaceQuota.getPolicyEnforcement(region).check(put); + quota.addMutation(put); + region.put(put); + + MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + long after = EnvironmentEdgeManager.currentTime(); + metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before); + } + } + + private void delete(HRegion region, OperationQuota quota, MutationProto mutation, + CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { + long before = EnvironmentEdgeManager.currentTime(); + Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); + checkCellSizeLimit(region, delete); + spaceQuota.getPolicyEnforcement(region).check(delete); + quota.addMutation(delete); + region.delete(delete); + + MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + long after = EnvironmentEdgeManager.currentTime(); + metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before); + } + } + + private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, + MutationProto mutation, CellScanner cellScanner, Condition condition, + ActivePolicyEnforcement spaceQuota) throws IOException { + long before = EnvironmentEdgeManager.currentTime(); + CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, + cellScanner); + checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); + spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); + quota.addMutation((Mutation) checkAndMutate.getAction()); + + CheckAndMutateResult result = null; + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); + } + if (result == null) { + result = region.checkAndMutate(checkAndMutate); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); + } + } + MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); + if (metricsRegionServer != null) { + long after = EnvironmentEdgeManager.currentTime(); + metricsRegionServer.updateCheckAndMutate(after - before); + + MutationType type = mutation.getMutateType(); + switch (type) { case PUT: - if (request.hasCondition()) { - metricsRegionServer.updateCheckAndPut(after - before); - } else { - metricsRegionServer.updatePut( - region == null ? null : region.getRegionInfo().getTable(),after - before); - } + metricsRegionServer.updateCheckAndPut(after - before); + break; + case DELETE: + metricsRegionServer.updateCheckAndDelete(after - before); break; default: break; - } } } + return result; } // This is used to keep compatible with the old client implementation. Consider remove it if we diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 1790468d8b1..d03d19f173b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -308,7 +310,11 @@ public interface Region extends ConfigurationObserver { * @param comparator the expected value * @param mutation data to put if check succeeds * @return true if mutation was applied, false otherwise + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #checkAndMutate(CheckAndMutate)} instead. */ + @Deprecated default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, ByteArrayComparable comparator, Mutation mutation) throws IOException { return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation); @@ -327,7 +333,11 @@ public interface Region extends ConfigurationObserver { * @param mutation data to put if check succeeds * @param timeRange time range to check * @return true if mutation was applied, false otherwise + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #checkAndMutate(CheckAndMutate)} instead. */ + @Deprecated boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException; @@ -338,7 +348,11 @@ public interface Region extends ConfigurationObserver { * @param filter the filter * @param mutation data to put if check succeeds * @return true if mutation was applied, false otherwise + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #checkAndMutate(CheckAndMutate)} instead. */ + @Deprecated default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation) throws IOException { return checkAndMutate(row, filter, TimeRange.allTime(), mutation); @@ -352,7 +366,11 @@ public interface Region extends ConfigurationObserver { * @param mutation data to put if check succeeds * @param timeRange time range to check * @return true if mutation was applied, false otherwise + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #checkAndMutate(CheckAndMutate)} instead. */ + @Deprecated boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation) throws IOException; @@ -368,7 +386,11 @@ public interface Region extends ConfigurationObserver { * @param comparator the expected value * @param mutations data to put if check succeeds * @return true if mutations were applied, false otherwise + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #checkAndMutate(CheckAndMutate)} instead. */ + @Deprecated default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, RowMutations mutations) throws IOException { return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), @@ -388,7 +410,11 @@ public interface Region extends ConfigurationObserver { * @param mutations data to put if check succeeds * @param timeRange time range to check * @return true if mutations were applied, false otherwise + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #checkAndMutate(CheckAndMutate)} instead. */ + @Deprecated boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations) throws IOException; @@ -401,7 +427,11 @@ public interface Region extends ConfigurationObserver { * @param filter the filter * @param mutations data to put if check succeeds * @return true if mutations were applied, false otherwise + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #checkAndMutate(CheckAndMutate)} instead. */ + @Deprecated default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations) throws IOException { return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations); @@ -416,10 +446,24 @@ public interface Region extends ConfigurationObserver { * @param mutations data to put if check succeeds * @param timeRange time range to check * @return true if mutations were applied, false otherwise + * + * @deprecated since 2.4.0 and will be removed in 4.0.0. Use + * {@link #checkAndMutate(CheckAndMutate)} instead. */ + @Deprecated boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange, RowMutations mutations) throws IOException; + /** + * Atomically checks if a row matches the conditions and if it does, it performs the actions. + * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a + * time. + * @param checkAndMutate the CheckAndMutate object + * @return true if mutations were applied, false otherwise + * @throws IOException if an error occurred in this method + */ + CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException; + /** * Deletes the specified cells/row. * @param delete diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 93c314d46d5..56664db15eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RawCellBuilder; @@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.RawCellBuilderFactory; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SharedConnection; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -67,8 +68,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.filter.ByteArrayComparable; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -1051,322 +1050,70 @@ public class RegionCoprocessorHost /** * Supports Coprocessor 'bypass'. - * @param row row to check - * @param family column family - * @param qualifier column qualifier - * @param op the comparison operation - * @param comparator the comparator - * @param put data to put if check succeeds + * @param checkAndMutate the CheckAndMutate object * @return true or false to return to client if default processing should be bypassed, or null - * otherwise + * otherwise + * @throws IOException if an error occurred on the coprocessor */ - public Boolean preCheckAndPut(final byte [] row, final byte [] family, - final byte [] qualifier, final CompareOperator op, - final ByteArrayComparable comparator, final Put put) - throws IOException { - boolean bypassable = true; - boolean defaultResult = false; - if (coprocEnvironments.isEmpty()) { - return null; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, - defaultResult, bypassable) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.preCheckAndPut(this, row, family, qualifier, - op, comparator, put, getResult()); - } - }); - } - - /** - * Supports Coprocessor 'bypass'. - * @param row row to check - * @param filter filter - * @param put data to put if check succeeds - * @return true or false to return to client if default processing should be bypassed, or null - * otherwise - */ - public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put) + public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException { boolean bypassable = true; - boolean defaultResult = false; + CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); if (coprocEnvironments.isEmpty()) { return null; } return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, - defaultResult, bypassable) { + new ObserverOperationWithResult( + regionObserverGetter, defaultResult, bypassable) { @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.preCheckAndPut(this, row, filter, put, getResult()); + public CheckAndMutateResult call(RegionObserver observer) throws IOException { + return observer.preCheckAndMutate(this, checkAndMutate, getResult()); } }); } /** * Supports Coprocessor 'bypass'. - * @param row row to check - * @param family column family - * @param qualifier column qualifier - * @param op the comparison operation - * @param comparator the comparator - * @param put data to put if check succeeds + * @param checkAndMutate the CheckAndMutate object * @return true or false to return to client if default processing should be bypassed, or null * otherwise + * @throws IOException if an error occurred on the coprocessor */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", - justification="Null is legit") - public Boolean preCheckAndPutAfterRowLock( - final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, - final ByteArrayComparable comparator, final Put put) throws IOException { - boolean bypassable = true; - boolean defaultResult = false; - if (coprocEnvironments.isEmpty()) { - return null; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, - defaultResult, bypassable) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier, - op, comparator, put, getResult()); - } - }); - } - - /** - * Supports Coprocessor 'bypass'. - * @param row row to check - * @param filter filter - * @param put data to put if check succeeds - * @return true or false to return to client if default processing should be bypassed, or null - * otherwise - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", - justification="Null is legit") - public Boolean preCheckAndPutAfterRowLock( - final byte[] row, final Filter filter, final Put put) throws IOException { - boolean bypassable = true; - boolean defaultResult = false; - if (coprocEnvironments.isEmpty()) { - return null; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, - defaultResult, bypassable) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult()); - } - }); - } - - /** - * @param row row to check - * @param family column family - * @param qualifier column qualifier - * @param op the comparison operation - * @param comparator the comparator - * @param put data to put if check succeeds - * @throws IOException e - */ - public boolean postCheckAndPut(final byte [] row, final byte [] family, - final byte [] qualifier, final CompareOperator op, - final ByteArrayComparable comparator, final Put put, - boolean result) throws IOException { - if (this.coprocEnvironments.isEmpty()) { - return result; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, result) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.postCheckAndPut(this, row, family, qualifier, - op, comparator, put, getResult()); - } - }); - } - - /** - * @param row row to check - * @param filter filter - * @param put data to put if check succeeds - * @throws IOException e - */ - public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put, - boolean result) throws IOException { - if (this.coprocEnvironments.isEmpty()) { - return result; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, result) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.postCheckAndPut(this, row, filter, put, getResult()); - } - }); - } - - /** - * Supports Coprocessor 'bypass'. - * @param row row to check - * @param family column family - * @param qualifier column qualifier - * @param op the comparison operation - * @param comparator the comparator - * @param delete delete to commit if check succeeds - * @return true or false to return to client if default processing should be bypassed, or null - * otherwise - */ - public Boolean preCheckAndDelete(final byte [] row, final byte [] family, - final byte [] qualifier, final CompareOperator op, - final ByteArrayComparable comparator, final Delete delete) - throws IOException { - boolean bypassable = true; - boolean defaultResult = false; - if (coprocEnvironments.isEmpty()) { - return null; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, - defaultResult, bypassable) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.preCheckAndDelete(this, row, family, - qualifier, op, comparator, delete, getResult()); - } - }); - } - - /** - * Supports Coprocessor 'bypass'. - * @param row row to check - * @param filter filter - * @param delete delete to commit if check succeeds - * @return true or false to return to client if default processing should be bypassed, or null - * otherwise - */ - public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete) + public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate) throws IOException { boolean bypassable = true; - boolean defaultResult = false; + CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); if (coprocEnvironments.isEmpty()) { return null; } return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, - defaultResult, bypassable) { + new ObserverOperationWithResult( + regionObserverGetter, defaultResult, bypassable) { @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.preCheckAndDelete(this, row, filter, delete, getResult()); + public CheckAndMutateResult call(RegionObserver observer) throws IOException { + return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult()); } }); } /** - * Supports Coprocessor 'bypass'. - * @param row row to check - * @param family column family - * @param qualifier column qualifier - * @param op the comparison operation - * @param comparator the comparator - * @param delete delete to commit if check succeeds - * @return true or false to return to client if default processing should be bypassed, - * or null otherwise + * @param checkAndMutate the CheckAndMutate object + * @param result the result returned by the checkAndMutate + * @return true or false to return to client if default processing should be bypassed, or null + * otherwise + * @throws IOException if an error occurred on the coprocessor */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", - justification="Null is legit") - public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, - final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, - final Delete delete) throws IOException { - boolean bypassable = true; - boolean defaultResult = false; - if (coprocEnvironments.isEmpty()) { - return null; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, - defaultResult, bypassable) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.preCheckAndDeleteAfterRowLock(this, row, - family, qualifier, op, comparator, delete, getResult()); - } - }); - } - - /** - * Supports Coprocessor 'bypass'. - * @param row row to check - * @param filter filter - * @param delete delete to commit if check succeeds - * @return true or false to return to client if default processing should be bypassed, - * or null otherwise - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", - justification="Null is legit") - public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter, - final Delete delete) throws IOException { - boolean bypassable = true; - boolean defaultResult = false; - if (coprocEnvironments.isEmpty()) { - return null; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, - defaultResult, bypassable) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult()); - } - }); - } - - /** - * @param row row to check - * @param family column family - * @param qualifier column qualifier - * @param op the comparison operation - * @param comparator the comparator - * @param delete delete to commit if check succeeds - * @throws IOException e - */ - public boolean postCheckAndDelete(final byte [] row, final byte [] family, - final byte [] qualifier, final CompareOperator op, - final ByteArrayComparable comparator, final Delete delete, - boolean result) throws IOException { + public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate, + CheckAndMutateResult result) throws IOException { if (this.coprocEnvironments.isEmpty()) { return result; } return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, result) { - @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.postCheckAndDelete(this, row, family, - qualifier, op, comparator, delete, getResult()); - } - }); - } - - /** - * @param row row to check - * @param filter filter - * @param delete delete to commit if check succeeds - * @throws IOException e - */ - public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete, - boolean result) throws IOException { - if (this.coprocEnvironments.isEmpty()) { - return result; - } - return execOperationWithResult( - new ObserverOperationWithResult(regionObserverGetter, result) { + new ObserverOperationWithResult( + regionObserverGetter, result) { @Override - public Boolean call(RegionObserver observer) throws IOException { - return observer.postCheckAndDelete(this, row, filter, delete, getResult()); + public CheckAndMutateResult call(RegionObserver observer) throws IOException { + return observer.postCheckAndMutate(this, checkAndMutate, getResult()); } }); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index ef62901f182..07273853f82 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -113,6 +115,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0); final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0); final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndMutate = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndMutateAfterRowLock = new AtomicInteger(0); + final AtomicInteger ctPostCheckAndMutate = new AtomicInteger(0); final AtomicInteger ctPreScannerNext = new AtomicInteger(0); final AtomicInteger ctPostScannerNext = new AtomicInteger(0); final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0); @@ -583,6 +588,28 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return true; } + @Override + public CheckAndMutateResult preCheckAndMutate(ObserverContext c, + CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException { + ctPreCheckAndMutate.incrementAndGet(); + return RegionObserver.super.preCheckAndMutate(c, checkAndMutate, result); + } + + @Override + public CheckAndMutateResult preCheckAndMutateAfterRowLock( + ObserverContext c, CheckAndMutate checkAndMutate, + CheckAndMutateResult result) throws IOException { + ctPreCheckAndMutateAfterRowLock.incrementAndGet(); + return RegionObserver.super.preCheckAndMutateAfterRowLock(c, checkAndMutate, result); + } + + @Override + public CheckAndMutateResult postCheckAndMutate(ObserverContext c, + CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException { + ctPostCheckAndMutate.incrementAndGet(); + return RegionObserver.super.postCheckAndMutate(c, checkAndMutate, result); + } + @Override public Result preAppendAfterRowLock(ObserverContext e, Append append) throws IOException { @@ -826,6 +853,18 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return ctPostCheckAndDeleteWithFilter.get(); } + public int getPreCheckAndMutate() { + return ctPreCheckAndMutate.get(); + } + + public int getPreCheckAndMutateAfterRowLock() { + return ctPreCheckAndMutateAfterRowLock.get(); + } + + public int getPostCheckAndMutate() { + return ctPostCheckAndMutate.get(); + } + public boolean hadPreIncrement() { return ctPreIncrement.get() > 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 5b770274678..a3502bf52bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -45,11 +45,13 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionLocator; @@ -267,15 +269,17 @@ public class TestRegionObserverInterface { verifyMethodResult(SimpleRegionObserver.class, new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", - "getPostCheckAndPutWithFilter" }, - tableName, new Integer[] { 0, 0, 0, 0, 0, 0 }); + "getPostCheckAndPutWithFilter", "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 }); table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p); verifyMethodResult(SimpleRegionObserver.class, new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", - "getPostCheckAndPutWithFilter" }, - tableName, new Integer[] { 1, 1, 1, 0, 0, 0 }); + "getPostCheckAndPutWithFilter", "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 }); table.checkAndMutate(Bytes.toBytes(0), new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)) @@ -283,8 +287,9 @@ public class TestRegionObserverInterface { verifyMethodResult(SimpleRegionObserver.class, new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut", "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock", - "getPostCheckAndPutWithFilter" }, - tableName, new Integer[] { 1, 1, 1, 1, 1, 1 }); + "getPostCheckAndPutWithFilter", "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 }); } finally { util.deleteTable(tableName); } @@ -304,16 +309,18 @@ public class TestRegionObserverInterface { SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", - "getPostCheckAndDeleteWithFilter" }, - tableName, new Integer[] { 0, 0, 0, 0, 0, 0 }); + "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 }); table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d); verifyMethodResult( SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", - "getPostCheckAndDeleteWithFilter" }, - tableName, new Integer[] { 1, 1, 1, 0, 0, 0 }); + "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 }); table.checkAndMutate(Bytes.toBytes(0), new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A)) @@ -322,8 +329,50 @@ public class TestRegionObserverInterface { SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete", "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete", "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock", - "getPostCheckAndDeleteWithFilter" }, - tableName, new Integer[] { 1, 1, 1, 1, 1, 1 }); + "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 }); + } finally { + util.deleteTable(tableName); + table.close(); + } + } + + @Test + public void testCheckAndMutateWithRowMutationsHooks() throws Exception { + final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + + name.getMethodName()); + Table table = util.createTable(tableName, new byte[][] { A, B, C }); + try { + byte[] row = Bytes.toBytes(0); + + Put p = new Put(row).addColumn(A, A, A); + table.put(p); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 0, 0, 0 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(A, A, A) + .build(new RowMutations(row) + .add((Mutation) new Put(row).addColumn(B, B, B)) + .add((Mutation) new Delete(row)))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 1, 1, 1 }); + + Object[] result = new Object[2]; + table.batch(Arrays.asList(p, CheckAndMutate.newBuilder(row) + .ifEquals(A, A, A) + .build(new RowMutations(row) + .add((Mutation) new Put(row).addColumn(B, B, B)) + .add((Mutation) new Delete(row)))), result); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 2, 2, 2 }); } finally { util.deleteTable(tableName); table.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 48b1c0ae01c..803a33bbe93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -99,6 +99,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; @@ -1775,6 +1777,7 @@ public class TestHRegion { // checkAndMutate tests // //////////////////////////////////////////////////////////////////////////// @Test + @Deprecated public void testCheckAndMutate_WithEmptyRowValue() throws IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); @@ -1844,6 +1847,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndMutate_WithWrongValue() throws IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); @@ -1893,6 +1897,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndMutate_WithCorrectValue() throws IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); @@ -1941,6 +1946,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); @@ -2030,6 +2036,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndPut_ThatPutWasWritten() throws IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); @@ -2071,6 +2078,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndPut_wrongRowInPut() throws IOException { this.region = initHRegion(tableName, method, CONF, COLUMNS); Put put = new Put(row2); @@ -2085,6 +2093,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); @@ -2158,6 +2167,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndMutate_WithFilters() throws Throwable { final byte[] FAMILY = Bytes.toBytes("fam"); @@ -2232,6 +2242,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable { final byte[] FAMILY = Bytes.toBytes("fam"); @@ -2280,6 +2291,7 @@ public class TestHRegion { } @Test + @Deprecated public void testCheckAndMutate_wrongMutationType() throws Throwable { // Setting up region this.region = initHRegion(tableName, method, CONF, fam1); @@ -2289,7 +2301,7 @@ public class TestHRegion { new Increment(row).addColumn(fam1, qual1, 1)); fail("should throw DoNotRetryIOException"); } catch (DoNotRetryIOException e) { - assertEquals("Action must be Put or Delete", e.getMessage()); + assertEquals("Unsupported mutate type: INCREMENT", e.getMessage()); } try { @@ -2298,11 +2310,12 @@ public class TestHRegion { new Increment(row).addColumn(fam1, qual1, 1)); fail("should throw DoNotRetryIOException"); } catch (DoNotRetryIOException e) { - assertEquals("Action must be Put or Delete", e.getMessage()); + assertEquals("Unsupported mutate type: INCREMENT", e.getMessage()); } } @Test + @Deprecated public void testCheckAndMutate_wrongRow() throws Throwable { final byte[] wrongRow = Bytes.toBytes("wrongRow"); @@ -2314,7 +2327,8 @@ public class TestHRegion { new Put(wrongRow).addColumn(fam1, qual1, value1)); fail("should throw DoNotRetryIOException"); } catch (DoNotRetryIOException e) { - assertEquals("Action's getRow must match", e.getMessage()); + assertEquals("The row of the action (Put/Delete/RowMutations) doesn't " + + "match the original one ", e.getMessage()); } try { @@ -2323,7 +2337,8 @@ public class TestHRegion { new Put(wrongRow).addColumn(fam1, qual1, value1)); fail("should throw DoNotRetryIOException"); } catch (DoNotRetryIOException e) { - assertEquals("Action's getRow must match", e.getMessage()); + assertEquals("The row of the action (Put/Delete/RowMutations) doesn't " + + "match the original one ", e.getMessage()); } try { @@ -2335,7 +2350,8 @@ public class TestHRegion { .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2))); fail("should throw DoNotRetryIOException"); } catch (DoNotRetryIOException e) { - assertEquals("Action's getRow must match", e.getMessage()); + assertEquals("The row of the action (Put/Delete/RowMutations) doesn't " + + "match the original one ", e.getMessage()); } try { @@ -2347,10 +2363,494 @@ public class TestHRegion { .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2))); fail("should throw DoNotRetryIOException"); } catch (DoNotRetryIOException e) { - assertEquals("Action's getRow must match", e.getMessage()); + assertEquals("The row of the action (Put/Delete/RowMutations) doesn't " + + "match the original one ", e.getMessage()); } } + @Test + public void testCheckAndMutateWithEmptyRowValue() throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] fam1 = Bytes.toBytes("fam1"); + byte[] qf1 = Bytes.toBytes("qualifier"); + byte[] emptyVal = new byte[] {}; + byte[] val1 = Bytes.toBytes("value1"); + byte[] val2 = Bytes.toBytes("value2"); + + // Setting up region + this.region = initHRegion(tableName, method, CONF, fam1); + // Putting empty data in key + Put put = new Put(row1); + put.addColumn(fam1, qf1, emptyVal); + + // checkAndPut with empty value + CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); + assertTrue(res.isSuccess()); + + // Putting data in key + put = new Put(row1); + put.addColumn(fam1, qf1, val1); + + // checkAndPut with correct value + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); + assertTrue(res.isSuccess()); + + // not empty anymore + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); + assertFalse(res.isSuccess()); + + Delete delete = new Delete(row1); + delete.addColumn(fam1, qf1); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete)); + assertFalse(res.isSuccess()); + + put = new Put(row1); + put.addColumn(fam1, qf1, val2); + // checkAndPut with correct value + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put)); + assertTrue(res.isSuccess()); + + // checkAndDelete with correct value + delete = new Delete(row1); + delete.addColumn(fam1, qf1); + delete.addColumn(fam1, qf1); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete)); + assertTrue(res.isSuccess()); + + delete = new Delete(row1); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete)); + assertTrue(res.isSuccess()); + + // checkAndPut looking for a null value + put = new Put(row1); + put.addColumn(fam1, qf1, val1); + + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1) + .build(put)); + assertTrue(res.isSuccess()); + } + + @Test + public void testCheckAndMutateWithWrongValue() throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] fam1 = Bytes.toBytes("fam1"); + byte[] qf1 = Bytes.toBytes("qualifier"); + byte[] val1 = Bytes.toBytes("value1"); + byte[] val2 = Bytes.toBytes("value2"); + BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE); + BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE); + + // Setting up region + this.region = initHRegion(tableName, method, CONF, fam1); + // Putting data in key + Put put = new Put(row1); + put.addColumn(fam1, qf1, val1); + region.put(put); + + // checkAndPut with wrong value + CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put)); + assertFalse(res.isSuccess()); + + // checkAndDelete with wrong value + Delete delete = new Delete(row1); + delete.addFamily(fam1); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put)); + assertFalse(res.isSuccess()); + + // Putting data in key + put = new Put(row1); + put.addColumn(fam1, qf1, Bytes.toBytes(bd1)); + region.put(put); + + // checkAndPut with wrong value + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(put)); + assertFalse(res.isSuccess()); + + // checkAndDelete with wrong value + delete = new Delete(row1); + delete.addFamily(fam1); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(delete)); + assertFalse(res.isSuccess()); + } + + @Test + public void testCheckAndMutateWithCorrectValue() throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] fam1 = Bytes.toBytes("fam1"); + byte[] qf1 = Bytes.toBytes("qualifier"); + byte[] val1 = Bytes.toBytes("value1"); + BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE); + + // Setting up region + this.region = initHRegion(tableName, method, CONF, fam1); + // Putting data in key + long now = System.currentTimeMillis(); + Put put = new Put(row1); + put.addColumn(fam1, qf1, now, val1); + region.put(put); + + // checkAndPut with correct value + CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put)); + assertTrue("First", res.isSuccess()); + + // checkAndDelete with correct value + Delete delete = new Delete(row1, now + 1); + delete.addColumn(fam1, qf1); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete)); + assertTrue("Delete", res.isSuccess()); + + // Putting data in key + put = new Put(row1); + put.addColumn(fam1, qf1, now + 2, Bytes.toBytes(bd1)); + region.put(put); + + // checkAndPut with correct value + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(put)); + assertTrue("Second put", res.isSuccess()); + + // checkAndDelete with correct value + delete = new Delete(row1, now + 3); + delete.addColumn(fam1, qf1); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(delete)); + assertTrue("Second delete", res.isSuccess()); + } + + @Test + public void testCheckAndMutateWithNonEqualCompareOp() throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] fam1 = Bytes.toBytes("fam1"); + byte[] qf1 = Bytes.toBytes("qualifier"); + byte[] val1 = Bytes.toBytes("value1"); + byte[] val2 = Bytes.toBytes("value2"); + byte[] val3 = Bytes.toBytes("value3"); + byte[] val4 = Bytes.toBytes("value4"); + + // Setting up region + this.region = initHRegion(tableName, method, CONF, fam1); + // Putting val3 in key + Put put = new Put(row1); + put.addColumn(fam1, qf1, val3); + region.put(put); + + // Test CompareOp.LESS: original = val3, compare with val3, fail + CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put)); + assertFalse(res.isSuccess()); + + // Test CompareOp.LESS: original = val3, compare with val4, fail + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put)); + assertFalse(res.isSuccess()); + + // Test CompareOp.LESS: original = val3, compare with val2, + // succeed (now value = val2) + put = new Put(row1); + put.addColumn(fam1, qf1, val2); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put)); + assertTrue(res.isSuccess()); + + // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put)); + assertFalse(res.isSuccess()); + + // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2, + // succeed (value still = val2) + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put)); + assertTrue(res.isSuccess()); + + // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1, + // succeed (now value = val3) + put = new Put(row1); + put.addColumn(fam1, qf1, val3); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put)); + assertTrue(res.isSuccess()); + + // Test CompareOp.GREATER: original = val3, compare with val3, fail + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put)); + assertFalse(res.isSuccess()); + + // Test CompareOp.GREATER: original = val3, compare with val2, fail + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put)); + assertFalse(res.isSuccess()); + + // Test CompareOp.GREATER: original = val3, compare with val4, + // succeed (now value = val2) + put = new Put(row1); + put.addColumn(fam1, qf1, val2); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put)); + assertTrue(res.isSuccess()); + + // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val1).build(put)); + assertFalse(res.isSuccess()); + + // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2, + // succeed (value still = val2) + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val2).build(put)); + assertTrue(res.isSuccess()); + + // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val3).build(put)); + assertTrue(res.isSuccess()); + } + + @Test + public void testCheckAndPutThatPutWasWritten() throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] fam1 = Bytes.toBytes("fam1"); + byte[] fam2 = Bytes.toBytes("fam2"); + byte[] qf1 = Bytes.toBytes("qualifier"); + byte[] val1 = Bytes.toBytes("value1"); + byte[] val2 = Bytes.toBytes("value2"); + + byte[][] families = { fam1, fam2 }; + + // Setting up region + this.region = initHRegion(tableName, method, CONF, families); + // Putting data in the key to check + Put put = new Put(row1); + put.addColumn(fam1, qf1, val1); + region.put(put); + + // Creating put to add + long ts = System.currentTimeMillis(); + KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2); + put = new Put(row1); + put.add(kv); + + // checkAndPut with wrong value + CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put)); + assertTrue(res.isSuccess()); + + Get get = new Get(row1); + get.addColumn(fam2, qf1); + Cell[] actual = region.get(get).rawCells(); + + Cell[] expected = { kv }; + + assertEquals(expected.length, actual.length); + for (int i = 0; i < actual.length; i++) { + assertEquals(expected[i], actual[i]); + } + } + + @Test + public void testCheckAndDeleteThatDeleteWasWritten() throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] fam1 = Bytes.toBytes("fam1"); + byte[] fam2 = Bytes.toBytes("fam2"); + byte[] qf1 = Bytes.toBytes("qualifier1"); + byte[] qf2 = Bytes.toBytes("qualifier2"); + byte[] qf3 = Bytes.toBytes("qualifier3"); + byte[] val1 = Bytes.toBytes("value1"); + byte[] val2 = Bytes.toBytes("value2"); + byte[] val3 = Bytes.toBytes("value3"); + byte[] emptyVal = new byte[] {}; + + byte[][] families = { fam1, fam2 }; + + // Setting up region + this.region = initHRegion(tableName, method, CONF, families); + // Put content + Put put = new Put(row1); + put.addColumn(fam1, qf1, val1); + region.put(put); + Threads.sleep(2); + + put = new Put(row1); + put.addColumn(fam1, qf1, val2); + put.addColumn(fam2, qf1, val3); + put.addColumn(fam2, qf2, val2); + put.addColumn(fam2, qf3, val1); + put.addColumn(fam1, qf3, val1); + region.put(put); + + LOG.info("get={}", region.get(new Get(row1).addColumn(fam1, qf1)).toString()); + + // Multi-column delete + Delete delete = new Delete(row1); + delete.addColumn(fam1, qf1); + delete.addColumn(fam2, qf1); + delete.addColumn(fam1, qf3); + CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete)); + assertTrue(res.isSuccess()); + + Get get = new Get(row1); + get.addColumn(fam1, qf1); + get.addColumn(fam1, qf3); + get.addColumn(fam2, qf2); + Result r = region.get(get); + assertEquals(2, r.size()); + assertArrayEquals(val1, r.getValue(fam1, qf1)); + assertArrayEquals(val2, r.getValue(fam2, qf2)); + + // Family delete + delete = new Delete(row1); + delete.addFamily(fam2); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam2, qf1, CompareOperator.EQUAL, emptyVal).build(delete)); + assertTrue(res.isSuccess()); + + get = new Get(row1); + r = region.get(get); + assertEquals(1, r.size()); + assertArrayEquals(val1, r.getValue(fam1, qf1)); + + // Row delete + delete = new Delete(row1); + res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) + .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete)); + assertTrue(res.isSuccess()); + get = new Get(row1); + r = region.get(get); + assertEquals(0, r.size()); + } + + @Test + public void testCheckAndMutateWithFilters() throws Throwable { + final byte[] FAMILY = Bytes.toBytes("fam"); + + // Setting up region + this.region = initHRegion(tableName, method, CONF, FAMILY); + + // Put one row + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); + region.put(put); + + // Put with success + CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); + assertTrue(res.isSuccess()); + + Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + res = region.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("c")))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))); + assertFalse(res.isSuccess()); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty()); + + // Delete with success + res = region.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))); + assertTrue(res.isSuccess()); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty()); + + // Mutate with success + res = region.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))); + assertTrue(res.isSuccess()); + + result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty()); + } + + @Test + public void testCheckAndMutateWithFiltersAndTimeRange() throws Throwable { + final byte[] FAMILY = Bytes.toBytes("fam"); + + // Setting up region + this.region = initHRegion(tableName, method, CONF, FAMILY); + + // Put with specifying the timestamp + region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); + + // Put with success + CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))); + assertTrue(res.isSuccess()); + + Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + res = region.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))); + assertFalse(res.isSuccess()); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty()); + + // Mutate with success + res = region.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 101)) + .build(new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))); + assertTrue(res.isSuccess()); + + result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty()); + } + // //////////////////////////////////////////////////////////////////////////// // Delete tests // //////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java index 4bf71c81b8e..d046b367ed7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java @@ -151,6 +151,7 @@ public class TestMetricsRegionServer { rsm.updateDelete(null, 17); rsm.updateCheckAndDelete(17); rsm.updateCheckAndPut(17); + rsm.updateCheckAndMutate(17); } HELPER.assertCounter("appendNumOps", 24, serverSource); @@ -162,7 +163,7 @@ public class TestMetricsRegionServer { HELPER.assertCounter("deleteNumOps", 17, serverSource); HELPER.assertCounter("checkAndDeleteNumOps", 17, serverSource); HELPER.assertCounter("checkAndPutNumOps", 17, serverSource); - + HELPER.assertCounter("checkAndMutateNumOps", 17, serverSource); HELPER.assertCounter("slowAppendCount", 12, serverSource); HELPER.assertCounter("slowDeleteCount", 13, serverSource);