From b142f5dcd26f99337499c4a42889b09907e26c8c Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Thu, 26 Nov 2020 13:31:43 +0900 Subject: [PATCH] HBASE-25242 Add Increment/Append support to RowMutations (#2630) Signed-off-by: Duo Zhang Signed-off-by: Andrew Purtell --- .../hadoop/hbase/client/AsyncTable.java | 4 +- .../hadoop/hbase/client/AsyncTableImpl.java | 2 +- .../hadoop/hbase/client/CheckAndMutate.java | 112 +------ .../hbase/client/RawAsyncTableImpl.java | 10 +- .../hadoop/hbase/client/RowMutations.java | 7 +- .../org/apache/hadoop/hbase/client/Table.java | 5 +- .../hbase/client/TableOverAsyncTable.java | 4 +- .../hbase/shaded/protobuf/ProtobufUtil.java | 8 +- .../shaded/protobuf/RequestConverter.java | 37 +-- .../shaded/protobuf/ResponseConverter.java | 83 +++-- .../hbase/rest/client/RemoteHTable.java | 2 +- .../hbase/coprocessor/RegionObserver.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 147 +++++---- .../MiniBatchOperationInProgress.java | 1 + .../hbase/regionserver/RSRpcServices.java | 93 +++++- .../hadoop/hbase/regionserver/Region.java | 12 +- .../hadoop/hbase/client/DummyAsyncTable.java | 2 +- .../hadoop/hbase/client/TestAsyncTable.java | 146 ++++++++- .../hbase/client/TestAsyncTableBatch.java | 46 ++- .../client/TestAsyncTableNoncedRetry.java | 27 ++ .../hbase/client/TestCheckAndMutate.java | 118 +++++++ .../hbase/client/TestFromClientSide3.java | 46 ++- .../hbase/client/TestFromClientSide5.java | 41 ++- .../coprocessor/SimpleRegionObserver.java | 12 + .../TestRegionObserverInterface.java | 92 +++++- .../hbase/regionserver/RegionAsTable.java | 2 +- .../hbase/regionserver/TestHRegion.java | 305 +++++++++++++++++- .../hbase/thrift2/client/ThriftTable.java | 3 +- 28 files changed, 1055 insertions(+), 314 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index aae4fc7e2ec..b390909d369 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -396,9 +396,9 @@ public interface AsyncTable { * Performs multiple mutations atomically on a single row. Currently {@link Put} and * {@link Delete} are supported. * @param mutation object that specifies the set of mutations to perform atomically - * @return A {@link CompletableFuture} that always returns null when complete normally. + * @return A {@link CompletableFuture} that returns results of Increment/Append operations */ - CompletableFuture mutateRow(RowMutations mutation); + CompletableFuture mutateRow(RowMutations mutation); /** * The scan API uses the observer pattern. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index f931d67a310..a124467cd96 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -218,7 +218,7 @@ class AsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture mutateRow(RowMutations mutation) { + public CompletableFuture mutateRow(RowMutations mutation) { return wrap(rawTable.mutateRow(mutation)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java index f7d846b44c7..b7f17f310fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java @@ -17,14 +17,7 @@ */ package org.apache.hadoop.hbase.client; -import java.util.Collections; -import java.util.List; -import java.util.NavigableMap; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellBuilder; -import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; @@ -60,7 +53,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public final class CheckAndMutate extends Mutation { +public final class CheckAndMutate implements Row { /** * A builder class for building a CheckAndMutate object. @@ -202,15 +195,15 @@ public final class CheckAndMutate extends Mutation { } /** - * @param mutation mutations to perform if check succeeds + * @param mutations mutations to perform if check succeeds * @return a CheckAndMutate object */ - public CheckAndMutate build(RowMutations mutation) { - preCheck(mutation); + public CheckAndMutate build(RowMutations mutations) { + preCheck(mutations); if (filter != null) { - return new CheckAndMutate(row, filter, timeRange, mutation); + return new CheckAndMutate(row, filter, timeRange, mutations); } else { - return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation); + return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutations); } } } @@ -225,6 +218,7 @@ public final class CheckAndMutate extends Mutation { return new Builder(row); } + private final byte[] row; private final byte[] family; private final byte[] qualifier; private final CompareOperator op; @@ -235,7 +229,7 @@ public final class CheckAndMutate extends Mutation { private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op, byte[] value, TimeRange timeRange, Row action) { - super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap()); + this.row = row; this.family = family; this.qualifier = qualifier; this.op = op; @@ -246,7 +240,7 @@ public final class CheckAndMutate extends Mutation { } private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) { - super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap()); + this.row = row; this.family = null; this.qualifier = null; this.op = null; @@ -256,6 +250,14 @@ public final class CheckAndMutate extends Mutation { this.action = action; } + /** + * @return the row + */ + @Override + public byte[] getRow() { + return row; + } + /** * @return the family to check */ @@ -311,84 +313,4 @@ public final class CheckAndMutate extends Mutation { public Row getAction() { return action; } - - @Override - public NavigableMap> getFamilyCellMap() { - if (action instanceof Mutation) { - return ((Mutation) action).getFamilyCellMap(); - } - throw new UnsupportedOperationException(); - } - - @Override - public CellBuilder getCellBuilder(CellBuilderType cellBuilderType) { - if (action instanceof Mutation) { - return ((Mutation) action).getCellBuilder(); - } - throw new UnsupportedOperationException(); - } - - @Override - public long getTimestamp() { - if (action instanceof Mutation) { - return ((Mutation) action).getTimestamp(); - } - throw new UnsupportedOperationException(); - } - - @Override - public Mutation setTimestamp(long timestamp) { - if (action instanceof Mutation) { - return ((Mutation) action).setTimestamp(timestamp); - } - throw new UnsupportedOperationException(); - } - - @Override - public Durability getDurability() { - if (action instanceof Mutation) { - return ((Mutation) action).getDurability(); - } - throw new UnsupportedOperationException(); - } - - @Override - public Mutation setDurability(Durability d) { - if (action instanceof Mutation) { - return ((Mutation) action).setDurability(d); - } - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getAttribute(String name) { - if (action instanceof Mutation) { - return ((Mutation) action).getAttribute(name); - } - throw new UnsupportedOperationException(); - } - - @Override - public OperationWithAttributes setAttribute(String name, byte[] value) { - if (action instanceof Mutation) { - return ((Mutation) action).setAttribute(name, value); - } - throw new UnsupportedOperationException(); - } - - @Override - public int getPriority() { - if (action instanceof Mutation) { - return ((Mutation) action).getPriority(); - } - return ((RowMutations) action).getMaxPriority(); - } - - @Override - public OperationWithAttributes setPriority(int priority) { - if (action instanceof Mutation) { - return ((Mutation) action).setPriority(priority); - } - throw new UnsupportedOperationException(); - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 95a569f8585..3cffad8b44d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -513,17 +513,17 @@ class RawAsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture mutateRow(RowMutations mutation) { - return this. newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs) - .action((controller, loc, stub) -> - this. mutateRow(controller, loc, stub, mutation, + public CompletableFuture mutateRow(RowMutations mutations) { + return this. newCaller(mutations.getRow(), mutations.getMaxPriority(), + writeRpcTimeoutNs).action((controller, loc, stub) -> + this. mutateRow(controller, loc, stub, mutations, (rn, rm) -> { RegionAction.Builder regionMutationBuilder = RequestConverter .buildRegionAction(rn, rm); regionMutationBuilder.setAtomic(true); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()) .build(); - }, resp -> null)) + }, resp -> resp)) .call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java index 3b0f94b9dbc..0f8b429959d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java @@ -29,7 +29,6 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti /** * Performs multiple mutations atomically on a single row. - * Currently {@link Put} and {@link Delete} are supported. * * The mutations are performed in the order in which they * were added. @@ -75,8 +74,6 @@ public class RowMutations implements Row { } /** - * Currently only supports {@link Put} and {@link Delete} mutations. - * * @param mutation The data to send. * @throws IOException if the row of added mutation doesn't match the original row */ @@ -85,15 +82,13 @@ public class RowMutations implements Row { } /** - * Currently only supports {@link Put} and {@link Delete} mutations. - * * @param mutations The data to send. * @throws IOException if the row of added mutation doesn't match the original row */ public RowMutations add(List mutations) throws IOException { for (Mutation mutation : mutations) { if (!Bytes.equals(row, mutation.getRow())) { - throw new WrongRowIOException("The row in the recently added Put/Delete <" + + throw new WrongRowIOException("The row in the recently added Mutation <" + Bytes.toStringBinary(mutation.getRow()) + "> doesn't match the original one <" + Bytes.toStringBinary(this.row) + ">"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index eb98bc9c1f3..53da0cfb912 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -459,9 +459,10 @@ public interface Table extends Closeable { * {@link Put} and {@link Delete} are supported. * * @param rm object that specifies the set of mutations to perform atomically - * @throws IOException + * @return results of Increment/Append operations + * @throws IOException if a remote or network exception occurs. */ - default void mutateRow(final RowMutations rm) throws IOException { + default Result mutateRow(final RowMutations rm) throws IOException { throw new NotImplementedException("Add an implementation!"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 8639282d0a7..1260f313cf5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -311,8 +311,8 @@ class TableOverAsyncTable implements Table { } @Override - public void mutateRow(RowMutations rm) throws IOException { - FutureUtils.get(table.mutateRow(rm)); + public Result mutateRow(RowMutations rm) throws IOException { + return FutureUtils.get(table.mutateRow(rm)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index e00380d7171..b9a08676f8e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -3655,9 +3655,13 @@ public final class ProtobufUtil { return builder.build((Put) m); } else if (m instanceof Delete) { return builder.build((Delete) m); + } else if (m instanceof Increment) { + return builder.build((Increment) m); + } else if (m instanceof Append) { + return builder.build((Append) m); } else { - throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0) - .getClass().getSimpleName().toUpperCase()); + throw new DoNotRetryIOException("Unsupported mutate type: " + m.getClass() + .getSimpleName().toUpperCase()); } } else { return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 97e5adbda60..5dc5e3cf205 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -227,17 +227,9 @@ public final class RequestConverter { ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); for (Mutation mutation: rowMutations.getMutations()) { - MutationType mutateType; - if (mutation instanceof Put) { - mutateType = MutationType.PUT; - } else if (mutation instanceof Delete) { - mutateType = MutationType.DELETE; - } else { - throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + - mutation.getClass().getName()); - } mutationBuilder.clear(); - MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder); + MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, + mutationBuilder); actionBuilder.clear(); actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); @@ -343,17 +335,9 @@ public final class RequestConverter { ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); for (Mutation mutation: rowMutations.getMutations()) { - MutationType mutateType = null; - if (mutation instanceof Put) { - mutateType = MutationType.PUT; - } else if (mutation instanceof Delete) { - mutateType = MutationType.DELETE; - } else { - throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + - mutation.getClass().getName()); - } mutationBuilder.clear(); - MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder); + MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, + mutationBuilder); actionBuilder.clear(); actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); @@ -705,17 +689,9 @@ public final class RequestConverter { final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) throws IOException { for (Mutation mutation: rowMutations.getMutations()) { - MutationType type; - if (mutation instanceof Put) { - type = MutationType.PUT; - } else if (mutation instanceof Delete) { - type = MutationType.DELETE; - } else { - throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + - mutation.getClass().getName()); - } mutationBuilder.clear(); - MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder); + MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, + mutationBuilder); cells.add(mutation); actionBuilder.clear(); regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); @@ -723,7 +699,6 @@ public final class RequestConverter { } private static MutationType getMutationType(Mutation mutation) { - assert !(mutation instanceof CheckAndMutate); if (mutation instanceof Put) { return MutationType.PUT; } else if (mutation instanceof Delete) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java index 97ab9fdf933..d62f0ac74e2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java @@ -148,8 +148,6 @@ public final class ResponseConverter { actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion()); } - Object responseValue; - // For RowMutations/CheckAndMutate action, if there is an exception, the exception is set // at the RegionActionResult level and the ResultOrException is null at the original index Integer index = (indexMap == null ? null : indexMap.get(i)); @@ -158,39 +156,22 @@ public final class ResponseConverter { // If there is an exception from the server, the exception is set at // the RegionActionResult level, which has been handled above. if (actions.hasCondition()) { - Result result = null; - if (actionResult.getResultOrExceptionCount() > 0) { - ResultOrException roe = actionResult.getResultOrException(0); - if (roe.hasResult()) { - Result r = ProtobufUtil.toResult(roe.getResult(), cells); - if (!r.isEmpty()) { - result = r; - } - } - } - responseValue = new CheckAndMutateResult(actionResult.getProcessed(), result); + results.add(regionName, index, getCheckAndMutateResult(actionResult, cells)); } else { - responseValue = actionResult.getProcessed() ? - ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : - ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; + results.add(regionName, index, getMutateRowResult(actionResult, cells)); } - results.add(regionName, index, responseValue); continue; } if (actions.hasCondition()) { - Result result = null; - if (actionResult.getResultOrExceptionCount() > 0) { - ResultOrException roe = actionResult.getResultOrException(0); - Result r = ProtobufUtil.toResult(roe.getResult(), cells); - if (!r.isEmpty()) { - result = r; - } - } - responseValue = new CheckAndMutateResult(actionResult.getProcessed(), result); - results.add(regionName, 0, responseValue); + results.add(regionName, 0, getCheckAndMutateResult(actionResult, cells)); } else { + if (actionResult.hasProcessed()) { + results.add(regionName, 0, getMutateRowResult(actionResult, cells)); + continue; + } for (ResultOrException roe : actionResult.getResultOrExceptionList()) { + Object responseValue; if (roe.hasException()) { responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { @@ -198,12 +179,7 @@ public final class ResponseConverter { } else if (roe.hasServiceResult()) { responseValue = roe.getServiceResult(); } else { - // Sometimes, the response is just "it was processed". Generally, this occurs for things - // like mutateRows where either we get back 'processed' (or not) and optionally some - // statistics about the regions we touched. - responseValue = actionResult.getProcessed() ? - ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : - ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; + responseValue = ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE; } results.add(regionName, roe.getIndex(), responseValue); } @@ -220,6 +196,47 @@ public final class ResponseConverter { return results; } + private static CheckAndMutateResult getCheckAndMutateResult(RegionActionResult actionResult, + CellScanner cells) throws IOException { + Result result = null; + if (actionResult.getResultOrExceptionCount() > 0) { + // Get the result of the Increment/Append operations from the first element of the + // ResultOrException list + ResultOrException roe = actionResult.getResultOrException(0); + if (roe.hasResult()) { + Result r = ProtobufUtil.toResult(roe.getResult(), cells); + if (!r.isEmpty()) { + result = r; + } + } + } + return new CheckAndMutateResult(actionResult.getProcessed(), result); + } + + private static Result getMutateRowResult(RegionActionResult actionResult, CellScanner cells) + throws IOException { + if (actionResult.getProcessed()) { + Result result = null; + if (actionResult.getResultOrExceptionCount() > 0) { + // Get the result of the Increment/Append operations from the first element of the + // ResultOrException list + ResultOrException roe = actionResult.getResultOrException(0); + Result r = ProtobufUtil.toResult(roe.getResult(), cells); + if (!r.isEmpty()) { + r.setExists(true); + result = r; + } + } + if (result != null) { + return result; + } else { + return ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE; + } + } else { + return ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; + } + } + /** * Create a CheckAndMutateResult object from a protocol buffer MutateResponse * diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 0df64eaa0b8..4ae6d243752 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -808,7 +808,7 @@ public class RemoteHTable implements Table { } @Override - public void mutateRow(RowMutations rm) throws IOException { + public Result mutateRow(RowMutations rm) throws IOException { throw new IOException("atomicMutation not supported"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 3f1c6dc7fce..9eac46f5ea3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -919,7 +919,7 @@ public interface RegionObserver { /** * Called after checkAndMutate *

- * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation. + * Note: Do not retain references to any Cells in actions beyond the life of this invocation. * If need a Cell reference for later use, copy the cell and use that. * @param c the environment provided by the region server * @param checkAndMutate the CheckAndMutate object diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fc3c6234109..ed32fd5293a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3491,8 +3491,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { this.checkAndPrepareMutation(mutation, timestamp); - // store the family map reference to allow for mutations - familyCellMaps[index] = mutation.getFamilyCellMap(); + if (mutation instanceof Put || mutation instanceof Delete) { + // store the family map reference to allow for mutations + familyCellMaps[index] = mutation.getFamilyCellMap(); + } + // store durability for the batch (highest durability of all operations in the batch) Durability tmpDur = region.getEffectiveDurability(mutation.getDurability()); if (tmpDur.ordinal() > durability.ordinal()) { @@ -3883,33 +3886,51 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Bytes.toBytes(timestamp)); miniBatchOp.incrementNumOfDeletes(); } else if (mutation instanceof Increment || mutation instanceof Append) { - // For nonce operations - canProceed[index] = startNonceOperation(nonceGroup, nonce); - if (!canProceed[index]) { - // convert duplicate increment/append to get - List results = region.get(toGet(mutation), false, nonceGroup, nonce); - retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, - Result.create(results)); - return true; - } - boolean returnResults; if (mutation instanceof Increment) { returnResults = ((Increment) mutation).isReturnResults(); - miniBatchOp.incrementNumOfIncrements(); } else { returnResults = ((Append) mutation).isReturnResults(); - miniBatchOp.incrementNumOfAppends(); } - Result result = doCoprocessorPreCallAfterRowLock(mutation); + + // For nonce operations + canProceed[index] = startNonceOperation(nonceGroup, nonce); + if (!canProceed[index]) { + Result result; + if (returnResults) { + // convert duplicate increment/append to get + List results = region.get(toGet(mutation), false, nonceGroup, nonce); + result = Result.create(results); + } else { + result = Result.EMPTY_RESULT; + } + retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result); + return true; + } + + Result result = null; + if (region.coprocessorHost != null) { + if (mutation instanceof Increment) { + result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation); + } else { + result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation); + } + } if (result != null) { retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, returnResults ? result : Result.EMPTY_RESULT); return true; } + List results = returnResults ? new ArrayList<>(mutation.size()) : null; familyCellMaps[index] = reckonDeltas(mutation, results, timestamp); - this.results[index] = results != null ? Result.create(results): Result.EMPTY_RESULT; + this.results[index] = results != null ? Result.create(results) : Result.EMPTY_RESULT; + + if (mutation instanceof Increment) { + miniBatchOp.incrementNumOfIncrements(); + } else { + miniBatchOp.incrementNumOfAppends(); + } } region.rewriteCellTags(familyCellMaps[index], mutation); @@ -3991,28 +4012,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return get; } - /** - * Do coprocessor pre-increment or pre-append after row lock call. - * @return Result returned out of the coprocessor, which means bypass all further processing - * and return the preferred Result instead, or null which means proceed. - */ - private Result doCoprocessorPreCallAfterRowLock(Mutation mutation) throws IOException { - assert mutation instanceof Increment || mutation instanceof Append; - Result result = null; - if (region.coprocessorHost != null) { - if (mutation instanceof Increment) { - result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation); - } else { - result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation); - } - } - return result; - } - private Map> reckonDeltas(Mutation mutation, List results, long now) throws IOException { assert mutation instanceof Increment || mutation instanceof Append; - Map> ret = new HashMap<>(); + Map> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); // Process a Store/family at a time. for (Map.Entry> entry: mutation.getFamilyCellMap().entrySet()) { final byte[] columnFamilyName = entry.getKey(); @@ -4055,14 +4058,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] columnFamily = store.getColumnFamilyDescriptor().getName(); List> cellPairs = new ArrayList<>(deltas.size()); + // Sort the cells so that they match the order that they appear in the Get results. + // Otherwise, we won't be able to find the existing values if the cells are not specified + // in order by the client since cells are in an array list. + sort(deltas, store.getComparator()); + // Get previous values for all columns in this family. + Get get = new Get(mutation.getRow()); + for (Cell cell: deltas) { + get.addColumn(columnFamily, CellUtil.cloneQualifier(cell)); + } TimeRange tr; if (mutation instanceof Increment) { tr = ((Increment) mutation).getTimeRange(); } else { tr = ((Append) mutation).getTimeRange(); } - List currentValues = get(mutation, store, deltas, tr); + + if (tr != null) { + get.setTimeRange(tr.getMin(), tr.getMax()); + } + + List currentValues = region.get(get, false); // Iterate the input columns and update existing values if they were found, otherwise // add new column initialized to the delta amount @@ -4156,31 +4173,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return PrivateCellUtil.getValueAsLong(cell); } - /** - * Do a specific Get on passed columnFamily and column qualifiers. - * @param mutation Mutation we are doing this Get for. - * @param store Which column family on row (TODO: Go all Gets in one go) - * @param coordinates Cells from mutation used as coordinates applied to Get. - * @return Return list of Cells found. - */ - private List get(Mutation mutation, HStore store, List coordinates, - TimeRange tr) throws IOException { - // Sort the cells so that they match the order that they appear in the Get results. - // Otherwise, we won't be able to find the existing values if the cells are not specified - // in order by the client since cells are in an array list. - // TODO: I don't get why we are sorting. St.Ack 20150107 - sort(coordinates, store.getComparator()); - Get get = new Get(mutation.getRow()); - for (Cell cell: coordinates) { - get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell)); - } - // Increments carry time range. If an Increment instance, put it on the Get. - if (tr != null) { - get.setTimeRange(tr.getMin(), tr.getMax()); - } - return region.get(get, false); - } - @Override public List> buildWALEdits(final MiniBatchOperationInProgress miniBatchOp) throws IOException { @@ -4355,6 +4347,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + // TODO Support Increment/Append operations private void checkAndMergeCPMutations(final MiniBatchOperationInProgress miniBatchOp, final List acquiredRowLocks, final long timestamp) throws IOException { visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), (int i) -> { @@ -4564,7 +4557,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Perform a batch of mutations. * - * It supports Put, Delete, Increment, Append mutations and will ignore other types passed. * Operations in a batch are stored with highest durability specified of for all operations in a * batch, except for {@link Durability#SKIP_WAL}. * @@ -4922,11 +4914,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // timestamp from get (see prepareDeleteTimestamps). } // All edits for the given row (across all column families) must happen atomically. - Result r = null; + Result r; if (mutation != null) { r = doBatchMutate(mutation, true).getResult(); } else { - mutateRow(rowMutations); + r = mutateRow(rowMutations); } this.checkAndMutateChecksPassed.increment(); return new CheckAndMutateResult(true, r); @@ -8303,11 +8295,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public void mutateRow(RowMutations rm) throws IOException { - // Don't need nonces here - RowMutations only supports puts and deletes + public Result mutateRow(RowMutations rm) throws IOException { final List m = rm.getMutations(); - batchMutate(m.toArray(new Mutation[m.size()]), true, HConstants.NO_NONCE, - HConstants.NO_NONCE); + OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true, + HConstants.NO_NONCE, HConstants.NO_NONCE); + + List results = new ArrayList<>(); + for (OperationStatus status : statuses) { + if (status.getResult() != null) { + results.add(status.getResult()); + } + } + + if (results.isEmpty()) { + return null; + } + + // Merge the results of the Increment/Append operations + List cells = new ArrayList<>(); + for (Result result : results) { + if (result.rawCells() != null) { + cells.addAll(Arrays.asList(result.rawCells())); + } + } + return Result.create(cells); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index ae5b6ec3c8e..ac6b5dc382e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -124,6 +124,7 @@ public class MiniBatchOperationInProgress { * in the same batch. These mutations are applied to the WAL and applied to the memstore as well. * The timestamp of the cells in the given Mutations MUST be obtained from the original mutation. * Note: The durability from CP will be replaced by the durability of corresponding mutation. + * Note: Currently only supports Put and Delete operations. * @param index the index that corresponds to the original mutation index in the batch * @param newOperations the Mutations to add */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5d9327bff54..f84a6ebbf4a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -624,8 +624,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); mutations.add(del); break; + case INCREMENT: + Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + checkCellSizeLimit(region, increment); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); + mutations.add(increment); + break; + case APPEND: + Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner); + ++countOfCompleteMutation; + checkCellSizeLimit(region, append); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); + mutations.add(append); + break; default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); + throw new DoNotRetryIOException("invalid mutation type : " + type); } } @@ -918,7 +932,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } /** - * Execute a list of Put/Delete mutations. + * Execute a list of mutations. * * @param builder * @param region @@ -948,12 +962,27 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } MutationProto m = action.getMutation(); Mutation mutation; - if (m.getMutateType() == MutationType.PUT) { - mutation = ProtobufUtil.toPut(m, cells); - batchContainsPuts = true; - } else { - mutation = ProtobufUtil.toDelete(m, cells); - batchContainsDelete = true; + switch (m.getMutateType()) { + case PUT: + mutation = ProtobufUtil.toPut(m, cells); + batchContainsPuts = true; + break; + + case DELETE: + mutation = ProtobufUtil.toDelete(m, cells); + batchContainsDelete = true; + break; + + case INCREMENT: + mutation = ProtobufUtil.toIncrement(m, cells); + break; + + case APPEND: + mutation = ProtobufUtil.toAppend(m, cells); + break; + + default: + throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); } mutationActionMap.put(mutation, action); mArray[i++] = mutation; @@ -976,11 +1005,51 @@ public class RSRpcServices implements HBaseRPCErrorHandler, OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE); + + // When atomic is true, it indicates that the mutateRow API or the batch API with + // RowMutations is called. In this case, we need to merge the results of the + // Increment/Append operations if the mutations include those operations, and set the merged + // result to the first element of the ResultOrException list + if (atomic) { + List resultOrExceptions = new ArrayList<>(); + List results = new ArrayList<>(); + for (i = 0; i < codes.length; i++) { + if (codes[i].getResult() != null) { + results.add(codes[i].getResult()); + } + if (i != 0) { + resultOrExceptions.add(getResultOrException( + ClientProtos.Result.getDefaultInstance(), i)); + } + } + + if (results.isEmpty()) { + builder.addResultOrException(getResultOrException( + ClientProtos.Result.getDefaultInstance(), 0)); + } else { + // Merge the results of the Increment/Append operations + List cellList = new ArrayList<>(); + for (Result result : results) { + if (result.rawCells() != null) { + cellList.addAll(Arrays.asList(result.rawCells())); + } + } + Result result = Result.create(cellList); + + // Set the merged result of the Increment/Append operations to the first element of the + // ResultOrException list + builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); + } + + builder.addAllResultOrException(resultOrExceptions); + return; + } + for (i = 0; i < codes.length; i++) { Mutation currentMutation = mArray[i]; ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); - int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i; - Exception e = null; + int index = currentAction.hasIndex() ? currentAction.getIndex() : i; + Exception e; switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); @@ -2801,6 +2870,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionActionResultBuilder.setProcessed(result.isSuccess()); for (int i = 0; i < regionAction.getActionCount(); i++) { if (i == 0 && result.getResult() != null) { + // Set the result of the Increment/Append operations to the first element of the + // ResultOrException list resultOrExceptionOrBuilder.setIndex(i); regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder .setResult(ProtobufUtil.toResult(result.getResult())).build()); @@ -2831,7 +2902,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, spaceQuotaEnforcement); regionActionResultBuilder.setProcessed(true); // We no longer use MultiResponse#processed. Instead, we use - // RegionActionResult#condition. This is for backward compatibility for old clients. + // RegionActionResult#processed. This is for backward compatibility for old clients. responseBuilder.setProcessed(true); } catch (IOException e) { rpcServer.getMetrics().exception(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 900e5711415..1457cda1fca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -293,8 +293,9 @@ public interface Region extends ConfigurationObserver { /** * Perform a batch of mutations. *

- * Note this supports only Put, Delete, Increment and Append mutations and will ignore other - * types passed. + * Please do not operate on a same column of a single row in a batch, we will not consider the + * previous operation in the same batch when performing the operations in the batch. + * * @param mutations the list of mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. @@ -531,13 +532,14 @@ public interface Region extends ConfigurationObserver { Result increment(Increment increment) throws IOException; /** - * Performs multiple mutations atomically on a single row. Currently - * {@link Put} and {@link Delete} are supported. + * Performs multiple mutations atomically on a single row. * * @param mutations object that specifies the set of mutations to perform atomically + * @return results of Increment/Append operations. If no Increment/Append operations, it returns + * null * @throws IOException */ - void mutateRow(RowMutations mutations) throws IOException; + Result mutateRow(RowMutations mutations) throws IOException; /** * Perform atomic mutations within the region. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java index ceed2cfd88d..d3390bf60f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java @@ -124,7 +124,7 @@ public class DummyAsyncTable implements AsyncT } @Override - public CompletableFuture mutateRow(RowMutations mutation) { + public CompletableFuture mutateRow(RowMutations mutation) { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 2944c40beee..9e6748e3437 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -250,18 +250,29 @@ public class TestAsyncTable { public void testMutateRow() throws InterruptedException, ExecutionException, IOException { AsyncTable table = getTable.get(); RowMutations mutation = new RowMutations(row); - mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); - table.mutateRow(mutation).get(); - Result result = table.get(new Get(row)).get(); + mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); + Result result = table.mutateRow(mutation).get(); + assertTrue(result.getExists()); + assertTrue(result.isEmpty()); + + result = table.get(new Get(row)).get(); assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); mutation = new RowMutations(row); - mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); - mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); - table.mutateRow(mutation).get(); + mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); + mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); + mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L)); + mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc"))); + result = table.mutateRow(mutation).get(); + assertTrue(result.getExists()); + assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); + assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); + result = table.get(new Get(row)).get(); assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); + assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); + assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); } // Tests for old checkAndMutate API @@ -995,7 +1006,7 @@ public class TestAsyncTable { public void testCheckAndIncrement() throws Throwable { AsyncTable table = getTable.get(); - table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); + table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); // CheckAndIncrement with correct value CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row) @@ -1052,7 +1063,7 @@ public class TestAsyncTable { public void testCheckAndAppend() throws Throwable { AsyncTable table = getTable.get(); - table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); + table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); // CheckAndAppend with correct value CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row) @@ -1105,6 +1116,66 @@ public class TestAsyncTable { assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); } + @Test + public void testCheckAndRowMutations() throws Throwable { + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final String v1 = "v1"; + + AsyncTable table = getTable.get(); + + // Initial values + table.putAll(Arrays.asList( + new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")), + new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)), + new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get(); + + // Do CheckAndRowMutations + CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, q1) + .build(new RowMutations(row).add(Arrays.asList( + new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)), + new Delete(row).addColumns(FAMILY, q2), + new Increment(row).addColumn(FAMILY, q3, 1), + new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))) + ); + + CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get(); + assertTrue(result.isSuccess()); + assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3))); + assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4))); + + // Verify the value + Result r = table.get(new Get(row)).get(); + assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); + assertNull(r.getValue(FAMILY, q2)); + assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); + assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); + + // Do CheckAndRowMutations again + checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, q1) + .build(new RowMutations(row).add(Arrays.asList( + new Delete(row).addColumns(FAMILY, q1), + new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)), + new Increment(row).addColumn(FAMILY, q3, 1), + new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))) + ); + + result = table.checkAndMutate(checkAndMutate).get(); + assertFalse(result.isSuccess()); + assertNull(result.getResult()); + + // Verify the value + r = table.get(new Get(row)).get(); + assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); + assertNull(r.getValue(FAMILY, q2)); + assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); + assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); + } + // Tests for batch version of checkAndMutate @Test @@ -1512,6 +1583,65 @@ public class TestAsyncTable { assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); } + @Test + public void testCheckAndRowMutationsBatch() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + + table.putAll(Arrays.asList( + new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L)) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")) + .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L)) + .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))) + ).get(); + + // CheckAndIncrement with correct value + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) + .build(new RowMutations(row).add(Arrays.asList( + new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")), + new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L), + new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) + ))); + + // CheckAndIncrement with wrong value + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a")) + .build(new RowMutations(row2).add(Arrays.asList( + new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), + new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")), + new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L), + new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")) + ))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0).isSuccess()); + assertEquals(2, Bytes.toLong(results.get(0).getResult() + .getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals("dd", Bytes.toString(results.get(0).getResult() + .getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(results.get(1).isSuccess()); + assertNull(results.get(1).getResult()); + + Result result = table.get(new Get(row)).get(); + assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertNull(result.getValue(FAMILY, Bytes.toBytes("B"))); + assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + result = table.get(new Get(row2)).get(); + assertNull(result.getValue(FAMILY, Bytes.toBytes("E"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G")))); + assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H")))); + } + @Test public void testDisabled() throws InterruptedException, ExecutionException { ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index 20ec40e7bb0..3fb1a14e247 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -358,11 +358,17 @@ public class TestAsyncTableBatch { CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1) .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) - .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g"))); + .build(new RowMutations(row1) + .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) + .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) + .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) + .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); RowMutations mutations = new RowMutations(row3) - .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) - .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))); + .add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) + .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) + .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4) .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); @@ -378,16 +384,28 @@ public class TestAsyncTableBatch { checkAndMutate3, checkAndMutate4); List results = table.batchAll(actions).get(); - assertTrue(((CheckAndMutateResult) results.get(0)).isSuccess()); - assertNull(((CheckAndMutateResult) results.get(0)).getResult()); + CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(0); + assertTrue(checkAndMutateResult.isSuccess()); + assertEquals(3L, + Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals("d", + Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); + assertEquals("b", Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B")))); - assertTrue(((Result) results.get(2)).getExists()); - assertFalse(((CheckAndMutateResult) results.get(3)).isSuccess()); - assertNull(((CheckAndMutateResult) results.get(3)).getResult()); + + Result result = (Result) results.get(2); + assertTrue(result.getExists()); + assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + checkAndMutateResult = (CheckAndMutateResult) results.get(3); + assertFalse(checkAndMutateResult.isSuccess()); + assertNull(checkAndMutateResult.getResult()); + assertTrue(((Result) results.get(4)).isEmpty()); - CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(5); + checkAndMutateResult = (CheckAndMutateResult) results.get(5); assertTrue(checkAndMutateResult.isSuccess()); assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult() .getValue(FAMILY, Bytes.toBytes("F")))); @@ -397,12 +415,18 @@ public class TestAsyncTableBatch { assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult() .getValue(FAMILY, Bytes.toBytes("G")))); - Result result = table.get(new Get(row1)).get(); - assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + result = table.get(new Get(row1)).get(); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); + assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); result = table.get(new Get(row3)).get(); assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); result = table.get(new Get(row4)).get(); assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index 82a57f24140..10b358ff625 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Optional; @@ -134,6 +135,19 @@ public class TestAsyncTableNoncedRetry { assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); } + @Test + public void testAppendWhenReturnResultsEqualsFalse() throws InterruptedException, + ExecutionException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); + Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE) + .setReturnResults(false)).get(); + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isEmpty()); + } + @Test public void testIncrement() throws InterruptedException, ExecutionException { assertEquals(0, CALLED.get()); @@ -143,4 +157,17 @@ public class TestAsyncTableNoncedRetry { // make sure we called twice and the result is still correct assertEquals(2, CALLED.get()); } + + @Test + public void testIncrementWhenReturnResultsEqualsFalse() throws InterruptedException, + ExecutionException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); + Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L) + .setReturnResults(false)).get(); + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isEmpty()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java index a2f4048d283..c40f2c77f4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java @@ -702,6 +702,66 @@ public class TestCheckAndMutate { } } + @Test + public void testCheckAndRowMutations() throws Throwable { + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final String v1 = "v1"; + + try (Table table = createTable()) { + // Initial values + table.put(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")), + new Put(ROWKEY).addColumn(FAMILY, q3, Bytes.toBytes(5L)), + new Put(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("a")))); + + // Do CheckAndRowMutations + CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(ROWKEY) + .ifNotExists(FAMILY, q1) + .build(new RowMutations(ROWKEY).add(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, q1, Bytes.toBytes(v1)), + new Delete(ROWKEY).addColumns(FAMILY, q2), + new Increment(ROWKEY).addColumn(FAMILY, q3, 1), + new Append(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("b")))) + ); + + CheckAndMutateResult result = table.checkAndMutate(checkAndMutate); + assertTrue(result.isSuccess()); + assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3))); + assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4))); + + // Verify the value + Result r = table.get(new Get(ROWKEY)); + assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); + assertNull(r.getValue(FAMILY, q2)); + assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); + assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); + + // Do CheckAndRowMutations again + checkAndMutate = CheckAndMutate.newBuilder(ROWKEY) + .ifNotExists(FAMILY, q1) + .build(new RowMutations(ROWKEY).add(Arrays.asList( + new Delete(ROWKEY).addColumns(FAMILY, q1), + new Put(ROWKEY).addColumn(FAMILY, q2, Bytes.toBytes(v1)), + new Increment(ROWKEY).addColumn(FAMILY, q3, 1), + new Append(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("b")))) + ); + + result = table.checkAndMutate(checkAndMutate); + assertFalse(result.isSuccess()); + assertNull(result.getResult()); + + // Verify the value + r = table.get(new Get(ROWKEY)); + assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); + assertNull(r.getValue(FAMILY, q2)); + assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); + assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); + } + } + // Tests for batch version of checkAndMutate @Test @@ -1098,4 +1158,62 @@ public class TestCheckAndMutate { assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); } } + + @Test + public void testCheckAndRowMutationsBatch() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L)) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), + new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")) + .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L)) + .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))) + ); + + // CheckAndIncrement with correct value + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) + .build(new RowMutations(ROWKEY).add(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("B")), + new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), 1L), + new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) + ))); + + // CheckAndIncrement with wrong value + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a")) + .build(new RowMutations(ROWKEY2).add(Arrays.asList( + new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), + new Delete(ROWKEY2).addColumns(FAMILY, Bytes.toBytes("F")), + new Increment(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("G"), 1L), + new Append(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")) + ))); + + List results = + table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results.get(0).isSuccess()); + assertEquals(2, Bytes.toLong(results.get(0).getResult() + .getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals("dd", Bytes.toString(results.get(0).getResult() + .getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(results.get(1).isSuccess()); + assertNull(results.get(1).getResult()); + + Result result = table.get(new Get(ROWKEY)); + assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertNull(result.getValue(FAMILY, Bytes.toBytes("B"))); + assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + result = table.get(new Get(ROWKEY2)); + assertNull(result.getValue(FAMILY, Bytes.toBytes("E"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G")))); + assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H")))); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index 163ade525d3..d949fdbab6b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -474,11 +474,17 @@ public class TestFromClientSide3 { CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1) .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) - .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g"))); + .build(new RowMutations(row1) + .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) + .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) + .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) + .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); RowMutations mutations = new RowMutations(row3) - .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) - .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))); + .add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) + .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) + .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4) .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); @@ -495,16 +501,28 @@ public class TestFromClientSide3 { Object[] results = new Object[actions.size()]; table.batch(actions, results); - assertTrue(((CheckAndMutateResult) results[0]).isSuccess()); - assertNull(((CheckAndMutateResult) results[0]).getResult()); + CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0]; + assertTrue(checkAndMutateResult.isSuccess()); + assertEquals(3L, + Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals("d", + Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); + assertEquals("b", Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B")))); - assertTrue(((Result) results[2]).getExists()); - assertFalse(((CheckAndMutateResult) results[3]).isSuccess()); - assertNull(((CheckAndMutateResult) results[3]).getResult()); + + Result result = (Result) results[2]; + assertTrue(result.getExists()); + assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + checkAndMutateResult = (CheckAndMutateResult) results[3]; + assertFalse(checkAndMutateResult.isSuccess()); + assertNull(checkAndMutateResult.getResult()); + assertTrue(((Result) results[4]).isEmpty()); - CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[5]; + checkAndMutateResult = (CheckAndMutateResult) results[5]; assertTrue(checkAndMutateResult.isSuccess()); assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult() .getValue(FAMILY, Bytes.toBytes("F")))); @@ -514,12 +532,18 @@ public class TestFromClientSide3 { assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult() .getValue(FAMILY, Bytes.toBytes("G")))); - Result result = table.get(new Get(row1)); - assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + result = table.get(new Get(row1)); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); + assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); result = table.get(new Get(row3)); assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); result = table.get(new Get(row4)); assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java index 3213c0f0baa..7a1ab5a455e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java @@ -295,21 +295,27 @@ public class TestFromClientSide5 extends FromClientSideBase { } @Test - public void testRowMutation() throws Exception { - LOG.info("Starting testRowMutation"); + public void testRowMutations() throws Exception { + LOG.info("Starting testRowMutations"); final TableName tableName = name.getTableName(); try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { - byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") }; + byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), + Bytes.toBytes("c"), Bytes.toBytes("d") }; + + // Test for Put operations RowMutations arm = new RowMutations(ROW); Put p = new Put(ROW); p.addColumn(FAMILY, QUALIFIERS[0], VALUE); arm.add(p); - t.mutateRow(arm); + Result r = t.mutateRow(arm); + assertTrue(r.getExists()); + assertTrue(r.isEmpty()); Get g = new Get(ROW); - Result r = t.get(g); + r = t.get(g); assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); + // Test for Put and Delete operations arm = new RowMutations(ROW); p = new Put(ROW); p.addColumn(FAMILY, QUALIFIERS[1], VALUE); @@ -318,11 +324,34 @@ public class TestFromClientSide5 extends FromClientSideBase { d.addColumns(FAMILY, QUALIFIERS[0]); arm.add(d); // TODO: Trying mutateRow again. The batch was failing with a one try only. - t.mutateRow(arm); + r = t.mutateRow(arm); + assertTrue(r.getExists()); + assertTrue(r.isEmpty()); + r = t.get(g); assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); assertNull(r.getValue(FAMILY, QUALIFIERS[0])); + // Test for Increment and Append operations + arm = new RowMutations(ROW); + arm.add(Arrays.asList( + new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE), + new Delete(ROW).addColumns(FAMILY, QUALIFIERS[1]), + new Increment(ROW).addColumn(FAMILY, QUALIFIERS[2], 5L), + new Append(ROW).addColumn(FAMILY, QUALIFIERS[3], Bytes.toBytes("abc")) + )); + r = t.mutateRow(arm); + assertTrue(r.getExists()); + assertEquals(5L, Bytes.toLong(r.getValue(FAMILY, QUALIFIERS[2]))); + assertEquals("abc", Bytes.toString(r.getValue(FAMILY, QUALIFIERS[3]))); + + g = new Get(ROW); + r = t.get(g); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); + assertNull(r.getValue(FAMILY, QUALIFIERS[1])); + assertEquals(5L, Bytes.toLong(r.getValue(FAMILY, QUALIFIERS[2]))); + assertEquals("abc", Bytes.toString(r.getValue(FAMILY, QUALIFIERS[3]))); + // Test that we get a region level exception try { arm = new RowMutations(ROW); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 07273853f82..e432acdda1d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -777,14 +777,26 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return ctPreBatchMutate.get() > 0; } + public int getPreBatchMutate() { + return ctPreBatchMutate.get(); + } + public boolean hadPostBatchMutate() { return ctPostBatchMutate.get() > 0; } + public int getPostBatchMutate() { + return ctPostBatchMutate.get(); + } + public boolean hadPostBatchMutateIndispensably() { return ctPostBatchMutateIndispensably.get() > 0; } + public int getPostBatchMutateIndispensably() { + return ctPostBatchMutateIndispensably.get(); + } + public boolean hadPostStartRegionOperation() { return ctPostStartRegionOperation.get() > 0; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index b679c323fbf..ff9d8a16ba9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -241,14 +241,16 @@ public class TestRegionObserverInterface { inc.addColumn(A, A, 1); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" }, - tableName, new Boolean[] { false, false, false }); + new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock", + "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" }, + tableName, new Boolean[] { false, false, false, false, false, false }); table.increment(inc); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" }, - tableName, new Boolean[] { true, true, true }); + new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock", + "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" }, + tableName, new Boolean[] { true, true, true, true, true, true }); } finally { util.deleteTable(tableName); table.close(); @@ -337,7 +339,75 @@ public class TestRegionObserverInterface { } @Test - public void testCheckAndMutateWithRowMutationsHooks() throws Exception { + public void testCheckAndIncrementHooks() throws Exception { + final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + + name.getMethodName()); + Table table = util.createTable(tableName, new byte[][] { A, B, C }); + try { + byte[] row = Bytes.toBytes(0); + + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 0, 0, 0 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(A, A) + .build(new Increment(row).addColumn(A, A, 1))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 1, 1, 1 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(A, A, Bytes.toBytes(1L)) + .build(new Increment(row).addColumn(A, A, 1))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 2, 2, 2 }); + } finally { + util.deleteTable(tableName); + table.close(); + } + } + + @Test + public void testCheckAndAppendHooks() throws Exception { + final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + + name.getMethodName()); + Table table = util.createTable(tableName, new byte[][] { A, B, C }); + try { + byte[] row = Bytes.toBytes(0); + + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 0, 0, 0 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(A, A) + .build(new Append(row).addColumn(A, A, A))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 1, 1, 1 }); + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(A, A, A) + .build(new Append(row).addColumn(A, A, A))); + verifyMethodResult( + SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate", + "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" }, + tableName, new Integer[] { 2, 2, 2 }); + } finally { + util.deleteTable(tableName); + table.close(); + } + } + + @Test + public void testCheckAndRowMutationsHooks() throws Exception { final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName()); Table table = util.createTable(tableName, new byte[][] { A, B, C }); @@ -386,14 +456,18 @@ public class TestRegionObserverInterface { app.addColumn(A, A, A); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName, - new Boolean[] { false, false, false }); + new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock", + "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" }, + tableName, + new Boolean[] { false, false, false, false, false, false }); table.append(app); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName, - new Boolean[] { true, true, true }); + new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock", + "hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" }, + tableName, + new Boolean[] { true, true, true, true, true, true }); } finally { util.deleteTable(tableName); table.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java index e488a5cfbf5..969d01afa42 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java @@ -226,7 +226,7 @@ public class RegionAsTable implements Table { } @Override - public void mutateRow(RowMutations rm) throws IOException { + public Result mutateRow(RowMutations rm) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index da3f2204ddd..7d1bba5c7d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -61,6 +61,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; @@ -2385,6 +2386,7 @@ public class TestHRegion { CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Putting data in key put = new Put(row1); @@ -2394,17 +2396,20 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // not empty anymore res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); Delete delete = new Delete(row1); delete.addColumn(fam1, qf1); res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); put = new Put(row1); put.addColumn(fam1, qf1, val2); @@ -2412,6 +2417,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // checkAndDelete with correct value delete = new Delete(row1); @@ -2420,11 +2426,13 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); delete = new Delete(row1); res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // checkAndPut looking for a null value put = new Put(row1); @@ -2433,6 +2441,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1) .build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); } @Test @@ -2456,6 +2465,7 @@ public class TestHRegion { CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // checkAndDelete with wrong value Delete delete = new Delete(row1); @@ -2463,6 +2473,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Putting data in key put = new Put(row1); @@ -2473,6 +2484,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // checkAndDelete with wrong value delete = new Delete(row1); @@ -2480,6 +2492,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(delete)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); } @Test @@ -2509,6 +2522,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete)); assertTrue("Delete", res.isSuccess()); + assertNull(res.getResult()); // Putting data in key put = new Put(row1); @@ -2519,6 +2533,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(put)); assertTrue("Second put", res.isSuccess()); + assertNull(res.getResult()); // checkAndDelete with correct value delete = new Delete(row1, now + 3); @@ -2526,6 +2541,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(delete)); assertTrue("Second delete", res.isSuccess()); + assertNull(res.getResult()); } @Test @@ -2549,11 +2565,13 @@ public class TestHRegion { CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS: original = val3, compare with val4, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS: original = val3, compare with val2, // succeed (now value = val2) @@ -2562,17 +2580,20 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2, // succeed (value still = val2) res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1, // succeed (now value = val3) @@ -2581,16 +2602,19 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER: original = val3, compare with val3, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER: original = val3, compare with val2, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER: original = val3, compare with val4, // succeed (now value = val2) @@ -2599,22 +2623,26 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val1).build(put)); assertFalse(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2, // succeed (value still = val2) res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val2).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val3).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); } @Test @@ -2645,6 +2673,7 @@ public class TestHRegion { CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); Get get = new Get(row1); get.addColumn(fam2, qf1); @@ -2699,6 +2728,7 @@ public class TestHRegion { CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); Get get = new Get(row1); get.addColumn(fam1, qf1); @@ -2715,6 +2745,7 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam2, qf1, CompareOperator.EQUAL, emptyVal).build(delete)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); get = new Get(row1); r = region.get(get); @@ -2726,6 +2757,8 @@ public class TestHRegion { res = region.checkAndMutate(CheckAndMutate.newBuilder(row1) .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete)); assertTrue(res.isSuccess()); + assertNull(res.getResult()); + get = new Get(row1); r = region.get(get); assertEquals(0, r.size()); @@ -2754,6 +2787,7 @@ public class TestHRegion { Bytes.toBytes("b")))) .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); @@ -2767,6 +2801,7 @@ public class TestHRegion { Bytes.toBytes("c")))) .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))); assertFalse(res.isSuccess()); + assertNull(res.getResult()); assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty()); @@ -2779,6 +2814,7 @@ public class TestHRegion { Bytes.toBytes("b")))) .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty()); @@ -2794,6 +2830,7 @@ public class TestHRegion { .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))); assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); @@ -2818,6 +2855,7 @@ public class TestHRegion { .timeRange(TimeRange.between(0, 101)) .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))); assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); @@ -2829,10 +2867,11 @@ public class TestHRegion { .timeRange(TimeRange.between(0, 100)) .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))); assertFalse(res.isSuccess()); + assertNull(res.getResult()); assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty()); - // Mutate with success + // RowMutations with success res = region.checkAndMutate(CheckAndMutate.newBuilder(row) .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a"))) @@ -2842,6 +2881,7 @@ public class TestHRegion { .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))); assertTrue(res.isSuccess()); + assertNull(res.getResult()); result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))); assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); @@ -2970,6 +3010,126 @@ public class TestHRegion { assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); } + @Test + public void testCheckAndIncrementAndAppend() throws Throwable { + // Setting up region + this.region = initHRegion(tableName, method, CONF, fam1); + + // CheckAndMutate with Increment and Append + CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(fam1, qual) + .build(new RowMutations(row) + .add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L)) + .add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a"))) + ); + + CheckAndMutateResult result = region.checkAndMutate(checkAndMutate); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(fam1, qual1))); + assertEquals("a", Bytes.toString(result.getResult().getValue(fam1, qual2))); + + Result r = region.get(new Get(row)); + assertEquals(1L, Bytes.toLong(r.getValue(fam1, qual1))); + assertEquals("a", Bytes.toString(r.getValue(fam1, qual2))); + + // Set return results to false + checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(fam1, qual) + .build(new RowMutations(row) + .add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L).setReturnResults(false)) + .add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a")) + .setReturnResults(false)) + ); + + result = region.checkAndMutate(checkAndMutate); + assertTrue(result.isSuccess()); + assertNull(result.getResult().getValue(fam1, qual1)); + assertNull(result.getResult().getValue(fam1, qual2)); + + r = region.get(new Get(row)); + assertEquals(2L, Bytes.toLong(r.getValue(fam1, qual1))); + assertEquals("aa", Bytes.toString(r.getValue(fam1, qual2))); + + checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(fam1, qual) + .build(new RowMutations(row) + .add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L)) + .add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a")) + .setReturnResults(false)) + ); + + result = region.checkAndMutate(checkAndMutate); + assertTrue(result.isSuccess()); + assertEquals(3L, Bytes.toLong(result.getResult().getValue(fam1, qual1))); + assertNull(result.getResult().getValue(fam1, qual2)); + + r = region.get(new Get(row)); + assertEquals(3L, Bytes.toLong(r.getValue(fam1, qual1))); + assertEquals("aaa", Bytes.toString(r.getValue(fam1, qual2))); + } + + @Test + public void testCheckAndRowMutations() throws Throwable { + final byte[] row = Bytes.toBytes("row"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final String v1 = "v1"; + + region = initHRegion(tableName, method, CONF, fam1); + + // Initial values + region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q2, Bytes.toBytes("toBeDeleted")), + new Put(row).addColumn(fam1, q3, Bytes.toBytes(5L)), + new Put(row).addColumn(fam1, q4, Bytes.toBytes("a")), + }); + + // Do CheckAndRowMutations + CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(fam1, q1) + .build(new RowMutations(row).add(Arrays.asList( + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)), + new Delete(row).addColumns(fam1, q2), + new Increment(row).addColumn(fam1, q3, 1), + new Append(row).addColumn(fam1, q4, Bytes.toBytes("b")))) + ); + + CheckAndMutateResult result = region.checkAndMutate(checkAndMutate); + assertTrue(result.isSuccess()); + assertEquals(6L, Bytes.toLong(result.getResult().getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(result.getResult().getValue(fam1, q4))); + + // Verify the value + Result r = region.get(new Get(row)); + assertEquals(v1, Bytes.toString(r.getValue(fam1, q1))); + assertNull(r.getValue(fam1, q2)); + assertEquals(6L, Bytes.toLong(r.getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(r.getValue(fam1, q4))); + + // Do CheckAndRowMutations again + checkAndMutate = CheckAndMutate.newBuilder(row) + .ifNotExists(fam1, q1) + .build(new RowMutations(row).add(Arrays.asList( + new Delete(row).addColumns(fam1, q1), + new Put(row).addColumn(fam1, q2, Bytes.toBytes(v1)), + new Increment(row).addColumn(fam1, q3, 1), + new Append(row).addColumn(fam1, q4, Bytes.toBytes("b")))) + ); + + result = region.checkAndMutate(checkAndMutate); + assertFalse(result.isSuccess()); + assertNull(result.getResult()); + + // Verify the value + r = region.get(new Get(row)); + assertEquals(v1, Bytes.toString(r.getValue(fam1, q1))); + assertNull(r.getValue(fam1, q2)); + assertEquals(6L, Bytes.toLong(r.getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(r.getValue(fam1, q4))); + } + // //////////////////////////////////////////////////////////////////////////// // Delete tests // //////////////////////////////////////////////////////////////////////////// @@ -7251,6 +7411,149 @@ public class TestHRegion { } } + @Test + public void testMutateRow() throws Exception { + final byte[] row = Bytes.toBytes("row"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final String v1 = "v1"; + + region = initHRegion(tableName, method, CONF, fam1); + + // Initial values + region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q2, Bytes.toBytes("toBeDeleted")), + new Put(row).addColumn(fam1, q3, Bytes.toBytes(5L)), + new Put(row).addColumn(fam1, q4, Bytes.toBytes("a")), + }); + + // Do mutateRow + Result result = region.mutateRow(new RowMutations(row).add(Arrays.asList( + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)), + new Delete(row).addColumns(fam1, q2), + new Increment(row).addColumn(fam1, q3, 1), + new Append(row).addColumn(fam1, q4, Bytes.toBytes("b"))))); + + assertNotNull(result); + assertEquals(6L, Bytes.toLong(result.getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(result.getValue(fam1, q4))); + + // Verify the value + result = region.get(new Get(row)); + assertEquals(v1, Bytes.toString(result.getValue(fam1, q1))); + assertNull(result.getValue(fam1, q2)); + assertEquals(6L, Bytes.toLong(result.getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(result.getValue(fam1, q4))); + } + + @Test + public void testMutateRowInParallel() throws Exception { + final int numReaderThreads = 100; + final CountDownLatch latch = new CountDownLatch(numReaderThreads); + + final byte[] row = Bytes.toBytes("row"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final String v1 = "v1"; + final String v2 = "v2"; + + // We need to ensure the timestamp of the delete operation is more than the previous one + final AtomicLong deleteTimestamp = new AtomicLong(); + + region = initHRegion(tableName, method, CONF, fam1); + + // Initial values + region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)) + .addColumn(fam1, q2, deleteTimestamp.getAndIncrement(), Bytes.toBytes(v2)) + .addColumn(fam1, q3, Bytes.toBytes(1L)) + .addColumn(fam1, q4, Bytes.toBytes("a")) + }); + + final AtomicReference assertionError = new AtomicReference<>(); + + // Writer thread + Thread writerThread = new Thread(() -> { + try { + while (true) { + // If all the reader threads finish, then stop the writer thread + if (latch.await(0, TimeUnit.MILLISECONDS)) { + return; + } + + // Execute the mutations. This should be done atomically + region.mutateRow(new RowMutations(row).add(Arrays.asList( + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v2)), + new Delete(row).addColumns(fam1, q2, deleteTimestamp.getAndIncrement()), + new Increment(row).addColumn(fam1, q3, 1L), + new Append(row).addColumn(fam1, q4, Bytes.toBytes("b"))))); + + // We need to ensure the timestamps of the Increment/Append operations are more than the + // previous ones + Result result = region.get(new Get(row).addColumn(fam1, q3).addColumn(fam1, q4)); + long tsIncrement = result.getColumnLatestCell(fam1, q3).getTimestamp(); + long tsAppend = result.getColumnLatestCell(fam1, q4).getTimestamp(); + + // Put the initial values + region.batchMutate(new Mutation[] { + new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)) + .addColumn(fam1, q2, deleteTimestamp.getAndIncrement(), Bytes.toBytes(v2)) + .addColumn(fam1, q3, tsIncrement + 1, Bytes.toBytes(1L)) + .addColumn(fam1, q4, tsAppend + 1, Bytes.toBytes("a")) + }); + } + } catch (Exception e) { + e.printStackTrace(); + assertionError.set(new AssertionError(e.getMessage())); + } + }); + writerThread.start(); + + // Reader threads + for (int i = 0; i < numReaderThreads; i++) { + new Thread(() -> { + try { + for (int j = 0; j < 10000; j++) { + // Verify the values + Result result = region.get(new Get(row)); + + // The values should be equals to either the initial values or the values after + // executing the mutations + String q1Value = Bytes.toString(result.getValue(fam1, q1)); + if (v1.equals(q1Value)) { + assertEquals(v2, Bytes.toString(result.getValue(fam1, q2))); + assertEquals(1L, Bytes.toLong(result.getValue(fam1, q3))); + assertEquals("a", Bytes.toString(result.getValue(fam1, q4))); + } else if (v2.equals(q1Value)) { + assertNull(Bytes.toString(result.getValue(fam1, q2))); + assertEquals(2L, Bytes.toLong(result.getValue(fam1, q3))); + assertEquals("ab", Bytes.toString(result.getValue(fam1, q4))); + } else { + fail("the qualifier " + q1 + " should be " + v1 + " or " + v2 + ", but " + q1Value); + } + } + } catch (Exception e) { + e.printStackTrace(); + assertionError.set(new AssertionError(e.getMessage())); + } catch (AssertionError e) { + assertionError.set(e); + } + + latch.countDown(); + }).start(); + } + + writerThread.join(); + + if (assertionError.get() != null) { + throw assertionError.get(); + } + } + @Test public void testMutateRow_WriteRequestCount() throws Exception { byte[] row1 = Bytes.toBytes("row1"); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java index 81f41824867..487c2ca7d96 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java @@ -444,10 +444,11 @@ public class ThriftTable implements Table { } @Override - public void mutateRow(RowMutations rm) throws IOException { + public Result mutateRow(RowMutations rm) throws IOException { TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm); try { client.mutateRow(tableNameInBytes, tRowMutations); + return Result.EMPTY_RESULT; } catch (TException e) { throw new IOException(e); }