From 2b5c2c36d99e8ecb7e7a345d83b1551999632f4a Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Sun, 21 Mar 2021 22:47:53 +0900 Subject: [PATCH] HBASE-25678 Support nonce operations for Increment/Append in RowMutations and CheckAndMutate (#3073) Signed-off-by: stack --- .../client/AsyncBatchRpcRetryingCaller.java | 22 +- .../hadoop/hbase/client/AsyncProcess.java | 25 +- .../apache/hadoop/hbase/client/HTable.java | 36 +- .../hbase/client/RawAsyncTableImpl.java | 40 +- .../shaded/protobuf/RequestConverter.java | 138 ++++--- .../hbase/coprocessor/RegionObserver.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 59 +-- .../hbase/regionserver/RSRpcServices.java | 53 +-- .../client/TestAsyncTableNoncedRetry.java | 268 +++++++++++-- .../hbase/client/TestHTableNoncedRetry.java | 365 ++++++++++++++++++ 10 files changed, 850 insertions(+), 158 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7e05b05c815..7af385da2d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -172,7 +172,7 @@ class AsyncBatchRpcRetryingCaller { } else { action = new Action(rawAction, i); } - if (rawAction instanceof Append || rawAction instanceof Increment) { + if (hasIncrementOrAppend(rawAction)) { action.setNonce(conn.getNonceGenerator().newNonce()); } this.actions.add(action); @@ -184,6 +184,26 @@ class AsyncBatchRpcRetryingCaller { this.startNs = System.nanoTime(); } + private static boolean hasIncrementOrAppend(Row action) { + if (action instanceof Append || action instanceof Increment) { + return true; + } else if (action instanceof RowMutations) { + return hasIncrementOrAppend((RowMutations) action); + } else if (action instanceof CheckAndMutate) { + return hasIncrementOrAppend(((CheckAndMutate) action).getAction()); + } + return false; + } + + private static boolean hasIncrementOrAppend(RowMutations mutations) { + for (Mutation mutation : mutations.getMutations()) { + if (mutation instanceof Append || mutation instanceof Increment) { + return true; + } + } + return false; + } + private long remainingTimeNs() { return operationTimeoutNs - (System.nanoTime() - startNs); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 8cd046f2006..6071cb63645 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -398,8 +398,29 @@ class AsyncProcess { } private void setNonce(NonceGenerator ng, Row r, Action action) { - if (!(r instanceof Append) && !(r instanceof Increment)) return; - action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. + if (hasIncrementOrAppend(r)) { + action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. + } + } + + private static boolean hasIncrementOrAppend(Row action) { + if (action instanceof Append || action instanceof Increment) { + return true; + } else if (action instanceof RowMutations) { + return hasIncrementOrAppend((RowMutations) action); + } else if (action instanceof CheckAndMutate) { + return hasIncrementOrAppend(((CheckAndMutate) action).getAction()); + } + return false; + } + + private static boolean hasIncrementOrAppend(RowMutations mutations) { + for (Mutation mutation : mutations.getMutations()) { + if (mutation instanceof Append || mutation instanceof Increment) { + return true; + } + } + return false; } private int checkTimeout(String name, int timeout) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index a219fedd939..a04fd26df65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -555,17 +554,16 @@ public class HTable implements Table { @Override public Result mutateRow(final RowMutations rm) throws IOException { + long nonceGroup = getNonceGroup(); + long nonce = getNonce(); CancellableRegionServerCallable callable = new CancellableRegionServerCallable(this.connection, getName(), rm.getRow(), rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), rm.getMaxPriority()) { @Override protected MultiResponse rpcCall() throws Exception { - RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( - getLocation().getRegionInfo().getRegionName(), rm); - regionMutationBuilder.setAtomic(true); - MultiRequest request = - MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + MultiRequest request = RequestConverter.buildMultiRequest( + getLocation().getRegionInfo().getRegionName(), rm, nonceGroup, nonce); ClientProtos.MultiResponse response = doMulti(request); ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); if (res.hasException()) { @@ -597,6 +595,14 @@ public class HTable implements Table { return (Result) results[0]; } + private long getNonceGroup() { + return ((ClusterConnection) getConnection()).getNonceGenerator().getNonceGroup(); + } + + private long getNonce() { + return ((ClusterConnection) getConnection()).getNonceGenerator().newNonce(); + } + @Override public Result append(final Append append) throws IOException { checkHasFamilies(append); @@ -606,7 +612,8 @@ public class HTable implements Table { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); + getLocation().getRegionInfo().getRegionName(), append, super.getNonceGroup(), + super.getNonce()); MutateResponse response = doMutate(request); if (!response.hasResult()) return null; return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); @@ -625,7 +632,8 @@ public class HTable implements Table { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); + getLocation().getRegionInfo().getRegionName(), increment, super.getNonceGroup(), + super.getNonce()); MutateResponse response = doMutate(request); // Should this check for null like append does? return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); @@ -664,7 +672,7 @@ public class HTable implements Table { protected Long rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability, getNonceGroup(), getNonce()); + qualifier, amount, durability, super.getNonceGroup(), super.getNonce()); MutateResponse response = doMutate(request); Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); @@ -737,6 +745,8 @@ public class HTable implements Table { private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange, final RowMutations rm) throws IOException { + long nonceGroup = getNonceGroup(); + long nonce = getNonce(); CancellableRegionServerCallable callable = new CancellableRegionServerCallable(connection, getName(), rm.getRow(), rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), @@ -744,8 +754,8 @@ public class HTable implements Table { @Override protected MultiResponse rpcCall() throws Exception { MultiRequest request = RequestConverter - .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, op, value, filter, timeRange, rm); + .buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce); ClientProtos.MultiResponse response = doMulti(request); ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); if (res.hasException()) { @@ -822,6 +832,8 @@ public class HTable implements Table { private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException { + long nonceGroup = getNonceGroup(); + long nonce = getNonce(); ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), row, this.rpcControllerFactory.newController(), mutation.getPriority()) { @@ -829,7 +841,7 @@ public class HTable implements Table { protected CheckAndMutateResult rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value, - filter, timeRange, mutation); + filter, timeRange, mutation, nonceGroup, nonce); MutateResponse response = doMutate(request); if (response.hasResult()) { return new CheckAndMutateResult(response.getProcessed(), diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 1222d83d054..bed896eaa45 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; /** * The implementation of RawAsyncTable. @@ -362,7 +361,7 @@ class RawAsyncTableImpl implements AsyncTable { .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, p), + null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(); } @@ -374,7 +373,7 @@ class RawAsyncTableImpl implements AsyncTable { .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, d), + null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(); } @@ -387,8 +386,9 @@ class RawAsyncTableImpl implements AsyncTable { rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, - null, timeRange, rm), CheckAndMutateResult::isSuccess)) + (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, + null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), + CheckAndMutateResult::isSuccess)) .call(); } } @@ -425,7 +425,7 @@ class RawAsyncTableImpl implements AsyncTable { .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, p), + filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(); } @@ -436,7 +436,7 @@ class RawAsyncTableImpl implements AsyncTable { .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, d), + filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(); } @@ -448,8 +448,9 @@ class RawAsyncTableImpl implements AsyncTable { rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, - filter, timeRange, rm), CheckAndMutateResult::isSuccess)) + (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, + filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), + CheckAndMutateResult::isSuccess)) .call(); } } @@ -468,6 +469,8 @@ class RawAsyncTableImpl implements AsyncTable { if (mutation instanceof Put) { validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); } + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), mutation.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, @@ -475,21 +478,23 @@ class RawAsyncTableImpl implements AsyncTable { (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), checkAndMutate.getFamily(), checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), - checkAndMutate.getTimeRange(), m), + checkAndMutate.getTimeRange(), m, nonceGroup, nonce), (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) .call(); } else if (checkAndMutate.getAction() instanceof RowMutations) { RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this. mutateRow( controller, loc, stub, rowMutations, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), + (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(), checkAndMutate.getFamily(), checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), - checkAndMutate.getTimeRange(), rm), + checkAndMutate.getTimeRange(), rm, nonceGroup, nonce), resp -> resp)) .call(); } else { @@ -554,16 +559,13 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture mutateRow(RowMutations mutations) { validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); return this. newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs).action((controller, loc, stub) -> this. mutateRow(controller, loc, stub, mutations, - (rn, rm) -> { - RegionAction.Builder regionMutationBuilder = RequestConverter - .buildRegionAction(rn, rm); - regionMutationBuilder.setAtomic(true); - return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()) - .build(); - }, resp -> resp)) + (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), + resp -> resp)) .call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 54b55cc97b0..21644f4373e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -247,39 +247,78 @@ public final class RequestConverter { */ public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, - final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException { - return MutateRequest.newBuilder() - .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)) - .setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation)) + final Filter filter, final TimeRange timeRange, final Mutation mutation, long nonceGroup, + long nonce) throws IOException { + MutateRequest.Builder builder = MutateRequest.newBuilder(); + if (mutation instanceof Increment || mutation instanceof Append) { + builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation, nonce)) + .setNonceGroup(nonceGroup); + } else { + builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation)); + } + return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)) .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange)) .build(); } /** - * Create a protocol buffer MutateRequest for conditioned row mutations + * Create a protocol buffer MultiRequest for conditioned row mutations * - * @return a mutate request + * @return a multi request * @throws IOException */ - public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName, + public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName, final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange, - final RowMutations rowMutations) throws IOException { + final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException { + return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op, + value, filter, timeRange), nonceGroup, nonce); + } + + /** + * Create a protocol buffer MultiRequest for row mutations + * + * @return a multi request + */ + public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName, + final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException { + return buildMultiRequest(regionName, rowMutations, null, nonceGroup, nonce); + } + + private static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName, + final RowMutations rowMutations, final Condition condition, long nonceGroup, long nonce) + throws IOException { RegionAction.Builder builder = - getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); + getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); builder.setAtomic(true); + + boolean hasNonce = false; ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); for (Mutation mutation: rowMutations.getMutations()) { mutationBuilder.clear(); - MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, - mutationBuilder); + MutationProto mp; + if (mutation instanceof Increment || mutation instanceof Append) { + mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder, nonce); + hasNonce = true; + } else { + mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder); + } actionBuilder.clear(); actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } - return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition( - buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build(); + + if (condition != null) { + builder.setCondition(condition); + } + + MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); + if (hasNonce) { + multiRequestBuilder.setNonceGroup(nonceGroup); + } + + return multiRequestBuilder.addRegionAction(builder.build()).build(); } /** @@ -362,33 +401,6 @@ public final class RequestConverter { return builder.build(); } - /** - * Create a protocol buffer MultiRequest for row mutations. - * Does not propagate Action absolute position. Does not set atomic action on the created - * RegionAtomic. Caller should do that if wanted. - * @param regionName - * @param rowMutations - * @return a data-laden RegionMutation.Builder - * @throws IOException - */ - public static RegionAction.Builder buildRegionAction(final byte [] regionName, - final RowMutations rowMutations) - throws IOException { - RegionAction.Builder builder = - getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); - ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); - MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); - for (Mutation mutation: rowMutations.getMutations()) { - mutationBuilder.clear(); - MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, - mutationBuilder); - actionBuilder.clear(); - actionBuilder.setMutation(mp); - builder.addAction(actionBuilder.build()); - } - return builder; - } - public static RegionAction.Builder getRegionActionBuilderWithRegion( final RegionAction.Builder regionActionBuilder, final byte [] regionName) { RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); @@ -785,9 +797,6 @@ public final class RequestConverter { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } } - if (!multiRequestBuilder.hasNonceGroup() && hasNonce) { - multiRequestBuilder.setNonceGroup(nonceGroup); - } if (builder.getActionCount() > 0) { multiRequestBuilder.addRegionAction(builder.build()); } @@ -801,8 +810,11 @@ public final class RequestConverter { builder.clear(); getRegionActionBuilderWithRegion(builder, regionName); - buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder, - mutationBuilder); + boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) action.getAction(), + cells, action.getNonce(), builder, actionBuilder, mutationBuilder); + if (hasIncrementOrAppend) { + hasNonce = true; + } builder.setAtomic(true); multiRequestBuilder.addRegionAction(builder.build()); @@ -836,16 +848,21 @@ public final class RequestConverter { } else if (cam.getAction() instanceof Increment) { actionBuilder.clear(); mutationBuilder.clear(); - buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder, + buildNoDataRegionAction((Increment) cam.getAction(), cells, action.getNonce(), builder, actionBuilder, mutationBuilder); + hasNonce = true; } else if (cam.getAction() instanceof Append) { actionBuilder.clear(); mutationBuilder.clear(); - buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder, + buildNoDataRegionAction((Append) cam.getAction(), cells, action.getNonce(), builder, actionBuilder, mutationBuilder); + hasNonce = true; } else if (cam.getAction() instanceof RowMutations) { - buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder, - mutationBuilder); + boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) cam.getAction(), + cells, action.getNonce(), builder, actionBuilder, mutationBuilder); + if (hasIncrementOrAppend) { + hasNonce = true; + } builder.setAtomic(true); } else { throw new DoNotRetryIOException("CheckAndMutate doesn't support " + @@ -858,6 +875,10 @@ public final class RequestConverter { // in the overall multiRequest. indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex()); } + + if (!multiRequestBuilder.hasNonceGroup() && hasNonce) { + multiRequestBuilder.setNonceGroup(nonceGroup); + } } private static void buildNoDataRegionAction(final Put put, final List cells, @@ -907,18 +928,29 @@ public final class RequestConverter { MutationType.APPEND, append, mutationBuilder, nonce))); } - private static void buildNoDataRegionAction(final RowMutations rowMutations, - final List cells, final RegionAction.Builder regionActionBuilder, + /** + * @return whether or not the rowMutations has a Increment or Append + */ + private static boolean buildNoDataRegionAction(final RowMutations rowMutations, + final List cells, long nonce, final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) throws IOException { + boolean ret = false; for (Mutation mutation: rowMutations.getMutations()) { mutationBuilder.clear(); - MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, - mutationBuilder); + MutationProto mp; + if (mutation instanceof Increment || mutation instanceof Append) { + mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder, + nonce); + ret = true; + } else { + mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder); + } cells.add(mutation); actionBuilder.clear(); regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); } + return ret; } private static MutationType getMutationType(Mutation mutation) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 2bd766557fe..a4f3f08261d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -1551,7 +1551,7 @@ public interface RegionObserver { List> resultPairs = new ArrayList<>(cellPairs.size()); for (Pair pair : cellPairs) { resultPairs.add(new Pair<>(pair.getFirst(), - postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(), + postMutationBeforeWAL(ctx, MutationType.APPEND, mutation, pair.getFirst(), pair.getSecond()))); } return resultPairs; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5bfeff72ad9..9220b2faaab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3259,8 +3259,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected final Map>[] familyCellMaps; // For Increment/Append operations protected final Result[] results; - // For nonce operations - protected final boolean[] canProceed; protected final HRegion region; protected int nextIndexToProcess = 0; @@ -3276,7 +3274,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.walEditsFromCoprocessors = new WALEdit[operations.length]; familyCellMaps = new Map[operations.length]; this.results = new Result[operations.length]; - this.canProceed = new boolean[operations.length]; this.region = region; observedExceptions = new ObservedExceptionsInBatch(); @@ -3698,9 +3695,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private static class MutationBatchOperation extends BatchOperation { + // For nonce operations private long nonceGroup; - private long nonce; + protected boolean canProceed; public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic, long nonceGroup, long nonce) { @@ -3813,6 +3811,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void prepareMiniBatchOperations(MiniBatchOperationInProgress miniBatchOp, long timestamp, final List acquiredRowLocks) throws IOException { + // For nonce operations + canProceed = startNonceOperation(); + visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> { Mutation mutation = getMutation(index); if (mutation instanceof Put) { @@ -3831,8 +3832,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // For nonce operations - canProceed[index] = startNonceOperation(nonceGroup, nonce); - if (!canProceed[index]) { + if (!canProceed) { Result result; if (returnResults) { // convert duplicate increment/append to get @@ -3894,11 +3894,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Starts the nonce operation for a mutation, if needed. - * @param nonceGroup Nonce group from the request. - * @param nonce Nonce. * @return whether to proceed this mutation. */ - private boolean startNonceOperation(long nonceGroup, long nonce) throws IOException { + private boolean startNonceOperation() throws IOException { if (region.rsServices == null || region.rsServices.getNonceManager() == null || nonce == HConstants.NO_NONCE) { return true; @@ -3915,11 +3913,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Ends nonce operation for a mutation, if needed. - * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. - * @param nonce Nonce. * @param success Whether the operation for this nonce has succeeded. */ - private void endNonceOperation(long nonceGroup, long nonce, boolean success) { + private void endNonceOperation(boolean success) { if (region.rsServices != null && region.rsServices.getNonceManager() != null && nonce != HConstants.NO_NONCE) { region.rsServices.getNonceManager().endOperation(nonceGroup, nonce, success); @@ -4188,13 +4184,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // For nonce operations - visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> { - if (canProceed[i]) { - endNonceOperation(nonceGroup, nonce, - retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS); - } - return true; - }); + if (canProceed && nonce != HConstants.NO_NONCE) { + boolean[] areAllIncrementsAndAppendsSuccessful = new boolean[]{true}; + visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> { + Mutation mutation = getMutation(i); + if (mutation instanceof Increment || mutation instanceof Append) { + if (retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + areAllIncrementsAndAppendsSuccessful[0] = false; + return false; + } + } + return true; + }); + endNonceOperation(areAllIncrementsAndAppendsSuccessful[0]); + } // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. @@ -4452,7 +4455,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup, + public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup, long nonce) throws IOException { // As it stands, this is used for 3 things // * batchMutate with single mutation - put/delete/increment/append, separate or from @@ -4748,6 +4751,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + return checkAndMutate(checkAndMutate, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + + public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, long nonceGroup, + long nonce) throws IOException { byte[] row = checkAndMutate.getRow(); Filter filter = null; byte[] family = null; @@ -4859,9 +4867,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // All edits for the given row (across all column families) must happen atomically. Result r; if (mutation != null) { - r = mutate(mutation, true).getResult(); + r = mutate(mutation, true, nonceGroup, nonce).getResult(); } else { - r = mutateRow(rowMutations); + r = mutateRow(rowMutations, nonceGroup, nonce); } this.checkAndMutateChecksPassed.increment(); return new CheckAndMutateResult(true, r); @@ -7517,9 +7525,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public Result mutateRow(RowMutations rm) throws IOException { + return mutateRow(rm, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + + public Result mutateRow(RowMutations rm, long nonceGroup, long nonce) throws IOException { final List m = rm.getMutations(); - OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true, - HConstants.NO_NONCE, HConstants.NO_NONCE); + OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true, nonceGroup, nonce); List results = new ArrayList<>(); for (OperationStatus status : statuses) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7362a0ea054..a5cdccb4efe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -598,43 +598,47 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private CheckAndMutateResult checkAndMutate(HRegion region, List actions, - CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement) - throws IOException { + CellScanner cellScanner, Condition condition, long nonceGroup, + ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { int countOfCompleteMutation = 0; try { if (!region.getRegionInfo().isMetaRegion()) { regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); } List mutations = new ArrayList<>(); + long nonce = HConstants.NO_NONCE; for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); } - MutationType type = action.getMutation().getMutateType(); + MutationProto mutation = action.getMutation(); + MutationType type = mutation.getMutateType(); switch (type) { case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); + Put put = ProtobufUtil.toPut(mutation, cellScanner); ++countOfCompleteMutation; checkCellSizeLimit(region, put); spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); mutations.add(put); break; case DELETE: - Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); + Delete del = ProtobufUtil.toDelete(mutation, cellScanner); ++countOfCompleteMutation; spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); mutations.add(del); break; case INCREMENT: - Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner); + Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); + nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; ++countOfCompleteMutation; checkCellSizeLimit(region, increment); spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); mutations.add(increment); break; case APPEND: - Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner); + Append append = ProtobufUtil.toAppend(mutation, cellScanner); + nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; ++countOfCompleteMutation; checkCellSizeLimit(region, append); spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); @@ -654,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); } if (result == null) { - result = region.checkAndMutate(checkAndMutate); + result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); if (region.getCoprocessorHost() != null) { result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); } @@ -909,21 +913,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, final OperationQuota quota, final List mutations, - final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) + final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { // Just throw the exception. The exception will be caught and then added to region-level // exception for RegionAction. Leaving the null to action result is ok since the null // result is viewed as failure by hbase client. And the region-lever exception will be used // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and // AsyncBatchRpcRetryingCaller#onComplete for more details. - doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true); + doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); } private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, final OperationQuota quota, final List mutations, final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { try { - doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false); + doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, + spaceQuotaEnforcement, false); } catch (IOException e) { // Set the exception for each action. The mutations in same RegionAction are group to // different batch and then be processed individually. Hence, we don't set the region-level @@ -942,9 +947,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param mutations */ private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, - final OperationQuota quota, final List mutations, - final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) - throws IOException { + final OperationQuota quota, final List mutations, + final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, + boolean atomic) throws IOException { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -958,6 +963,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ Map mutationActionMap = new HashMap<>(); int i = 0; + long nonce = HConstants.NO_NONCE; for (ClientProtos.Action action: mutations) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + @@ -978,10 +984,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, case INCREMENT: mutation = ProtobufUtil.toIncrement(m, cells); + nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; break; case APPEND: mutation = ProtobufUtil.toAppend(m, cells); + nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; break; default: @@ -1006,7 +1014,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); } - OperationStatus[] codes = region.batchMutate(mArray, atomic); + OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); // When atomic is true, it indicates that the mutateRow API or the batch API with // RowMutations is called. In this case, we need to merge the results of the @@ -2758,7 +2766,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), - cellScanner, request.getCondition(), spaceQuotaEnforcement); + cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); responseBuilder.setProcessed(result.isSuccess()); ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = ClientProtos.ResultOrException.newBuilder(); @@ -2815,7 +2823,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (regionAction.getActionCount() == 1) { CheckAndMutateResult result = checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, - regionAction.getCondition(), spaceQuotaEnforcement); + regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); regionActionResultBuilder.setProcessed(result.isSuccess()); resultOrExceptionOrBuilder.setIndex(0); if (result.getResult() != null) { @@ -2824,7 +2832,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); } else { CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), - cellScanner, regionAction.getCondition(), spaceQuotaEnforcement); + cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); regionActionResultBuilder.setProcessed(result.isSuccess()); for (int i = 0; i < regionAction.getActionCount(); i++) { if (i == 0 && result.getResult() != null) { @@ -2850,7 +2858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { try { doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), - cellScanner, spaceQuotaEnforcement); + cellScanner, nonceGroup, spaceQuotaEnforcement); regionActionResultBuilder.setProcessed(true); // We no longer use MultiResponse#processed. Instead, we use // RegionActionResult#processed. This is for backward compatibility for old clients. @@ -2963,7 +2971,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (request.hasCondition()) { CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, - request.getCondition(), spaceQuotaEnforcement); + request.getCondition(), nonceGroup, spaceQuotaEnforcement); builder.setProcessed(result.isSuccess()); boolean clientCellBlockSupported = isClientCellBlockSupport(context); addResult(builder, result.getResult(), controller, clientCellBlockSupported); @@ -3047,11 +3055,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, - MutationProto mutation, CellScanner cellScanner, Condition condition, + MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, ActivePolicyEnforcement spaceQuota) throws IOException { long before = EnvironmentEdgeManager.currentTime(); CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); + long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); quota.addMutation((Mutation) checkAndMutate.getAction()); @@ -3061,7 +3070,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); } if (result == null) { - result = region.checkAndMutate(checkAndMutate); + result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); if (region.getCoprocessorHost() != null) { result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index 82cc1a8b961..62a4499eee3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -57,13 +60,17 @@ public class TestAsyncTableNoncedRetry { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static TableName TABLE_NAME = TableName.valueOf("async"); + private static final TableName TABLE_NAME = TableName.valueOf("async"); - private static byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] FAMILY = Bytes.toBytes("cf"); - private static byte[] QUALIFIER = Bytes.toBytes("cq"); + private static final byte[] QUALIFIER = Bytes.toBytes("cq"); - private static byte[] VALUE = Bytes.toBytes("value"); + private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2"); + + private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3"); + + private static final byte[] VALUE = Bytes.toBytes("value"); private static AsyncConnection ASYNC_CONN; @@ -72,9 +79,14 @@ public class TestAsyncTableNoncedRetry { private byte[] row; - private static AtomicInteger CALLED = new AtomicInteger(); + private static final AtomicInteger CALLED = new AtomicInteger(); - private static long SLEEP_TIME = 2000; + private static final long SLEEP_TIME = 2000; + + private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time + + // The number of miniBatchOperations that are executed in a RegionServer + private static int miniBatchOperationCount; public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor { @@ -84,21 +96,12 @@ public class TestAsyncTableNoncedRetry { } @Override - public Result postAppend(ObserverContext c, Append append, - Result result) throws IOException { - if (CALLED.getAndIncrement() == 0) { + public void postBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) { + // We sleep when the last of the miniBatchOperations is executed + if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) { Threads.sleepWithoutInterrupt(SLEEP_TIME); } - return RegionObserver.super.postAppend(c, append, result); - } - - @Override - public Result postIncrement(ObserverContext c, - Increment increment, Result result) throws IOException { - if (CALLED.getAndIncrement() == 0) { - Threads.sleepWithoutInterrupt(SLEEP_TIME); - } - return RegionObserver.super.postIncrement(c, increment, result); } } @@ -129,8 +132,11 @@ public class TestAsyncTableNoncedRetry { public void testAppend() throws InterruptedException, ExecutionException { assertEquals(0, CALLED.get()); AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) - .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + // make sure we called twice and the result is still correct assertEquals(2, CALLED.get()); assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); @@ -141,9 +147,12 @@ public class TestAsyncTableNoncedRetry { ExecutionException { assertEquals(0, CALLED.get()); AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) - .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE) .setReturnResults(false)).get(); + // make sure we called twice and the result is still correct assertEquals(2, CALLED.get()); assertTrue(result.isEmpty()); @@ -153,10 +162,14 @@ public class TestAsyncTableNoncedRetry { public void testIncrement() throws InterruptedException, ExecutionException { assertEquals(0, CALLED.get()); AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) - .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); - assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get(); + // make sure we called twice and the result is still correct assertEquals(2, CALLED.get()); + assertEquals(1L, result); } @Test @@ -164,11 +177,218 @@ public class TestAsyncTableNoncedRetry { ExecutionException { assertEquals(0, CALLED.get()); AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) - .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build(); + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L) .setReturnResults(false)).get(); + // make sure we called twice and the result is still correct assertEquals(2, CALLED.get()); assertTrue(result.isEmpty()); } + + @Test + public void testIncrementInRowMutations() + throws InterruptedException, ExecutionException, IOException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + Result result = table.mutateRow(new RowMutations(row) + .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) + .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get(); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); + } + + @Test + public void testAppendInRowMutations() + throws InterruptedException, ExecutionException, IOException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + Result result = table.mutateRow(new RowMutations(row) + .add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)) + .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get(); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + } + + @Test + public void testIncrementAndAppendInRowMutations() + throws InterruptedException, ExecutionException, IOException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + Result result = table.mutateRow(new RowMutations(row) + .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) + .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get(); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2)); + } + + @Test + public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER2) + .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get(); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + } + + @Test + public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER2) + .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get(); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); + } + + @Test + public void testIncrementInRowMutationsInCheckAndMutate() throws InterruptedException, + ExecutionException, IOException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER3) + .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) + .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get(); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + } + + @Test + public void testAppendInRowMutationsInCheckAndMutate() throws InterruptedException, + ExecutionException, IOException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER3) + .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)) + .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get(); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); + } + + @Test + public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws InterruptedException, + ExecutionException, IOException { + assertEquals(0, CALLED.get()); + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER3) + .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) + .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)))).get(); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2)); + } + + @Test + public void testBatch() throws InterruptedException, + ExecutionException, IOException { + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); + byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); + byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5"); + byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6"); + + assertEquals(0, CALLED.get()); + + AsyncTable table = ASYNC_CONN.getTableBuilder(TABLE_NAME) + .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build(); + + miniBatchOperationCount = 6; + List results = table.batchAll(Arrays.asList( + new Append(row).addColumn(FAMILY, QUALIFIER, VALUE), + new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L), + new RowMutations(row3) + .add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L)) + .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)), + CheckAndMutate.newBuilder(row4) + .ifNotExists(FAMILY, QUALIFIER2) + .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)), + CheckAndMutate.newBuilder(row5) + .ifNotExists(FAMILY, QUALIFIER2) + .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)), + CheckAndMutate.newBuilder(row6) + .ifNotExists(FAMILY, QUALIFIER3) + .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L)) + .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE))))).get(); + + // make sure we called twice and the result is still correct + + // should be called 12 times as 6 miniBatchOperations are called twice + assertEquals(12, CALLED.get()); + + assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER)); + + assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER))); + + assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER))); + assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2)); + + CheckAndMutateResult result; + + result = (CheckAndMutateResult) results.get(3); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + + result = (CheckAndMutateResult) results.get(4); + assertTrue(result.isSuccess()); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); + + result = (CheckAndMutateResult) results.get(5); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java new file mode 100644 index 00000000000..51ecba9bb64 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestHTableNoncedRetry { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHTableNoncedRetry.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("async"); + + private static final byte[] FAMILY = Bytes.toBytes("cf"); + + private static final byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2"); + + private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3"); + + private static final byte[] VALUE = Bytes.toBytes("value"); + + private static Connection CONN; + + @Rule + public TestName testName = new TestName(); + + private byte[] row; + + private Table table; + + private static final AtomicInteger CALLED = new AtomicInteger(); + + private static final int SLEEP_TIME = 2000; + + private static final int RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time + + // The number of miniBatchOperations that are executed in a RegionServer + private static int miniBatchOperationCount; + + public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void postBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) { + // We sleep when the last of the miniBatchOperations is executed + if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) { + Threads.sleepWithoutInterrupt(SLEEP_TIME); + } + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setCoprocessor(SleepOnceCP.class.getName()).build()); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + Closeables.close(CONN, true); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException, InterruptedException { + row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); + CALLED.set(0); + + table = CONN.getTable(TABLE_NAME); + table.setRpcTimeout(RPC_TIMEOUT); + } + + @After + public void tearDown() throws Exception { + table.close(); + } + + @Test + public void testAppend() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + } + + @Test + public void testAppendWhenReturnResultsEqualsFalse() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE) + .setReturnResults(false)); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isEmpty()); + } + + @Test + public void testIncrement() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertEquals(1L, result); + } + + @Test + public void testIncrementWhenReturnResultsEqualsFalse() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L) + .setReturnResults(false)); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isEmpty()); + } + + @Test + public void testIncrementInRowMutations() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + Result result = table.mutateRow(new RowMutations(row) + .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) + .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); + } + + @Test + public void testAppendInRowMutations() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + Result result = table.mutateRow(new RowMutations(row) + .add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)) + .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + } + + @Test + public void testIncrementAndAppendInRowMutations() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + Result result = table.mutateRow(new RowMutations(row) + .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) + .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2)); + } + + @Test + public void testIncrementInCheckAndMutate() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER2) + .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + } + + @Test + public void testAppendInCheckAndMutate() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER2) + .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); + } + + @Test + public void testIncrementInRowMutationsInCheckAndMutate() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER3) + .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) + .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + } + + @Test + public void testAppendInRowMutationsInCheckAndMutate() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER3) + .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)) + .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); + } + + @Test + public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws IOException { + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 1; + CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER3) + .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)) + .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)))); + + // make sure we called twice and the result is still correct + assertEquals(2, CALLED.get()); + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2)); + } + + @Test + public void testBatch() throws IOException, InterruptedException { + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); + byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); + byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5"); + byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6"); + + assertEquals(0, CALLED.get()); + + miniBatchOperationCount = 6; + Object[] results = new Object[6]; + table.batch(Arrays.asList( + new Append(row).addColumn(FAMILY, QUALIFIER, VALUE), + new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L), + new RowMutations(row3) + .add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L)) + .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)), + CheckAndMutate.newBuilder(row4) + .ifNotExists(FAMILY, QUALIFIER2) + .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)), + CheckAndMutate.newBuilder(row5) + .ifNotExists(FAMILY, QUALIFIER2) + .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)), + CheckAndMutate.newBuilder(row6) + .ifNotExists(FAMILY, QUALIFIER3) + .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L)) + .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE)))), results); + + // make sure we called twice and the result is still correct + + // should be called 12 times as 6 miniBatchOperations are called twice + assertEquals(12, CALLED.get()); + + assertArrayEquals(VALUE, ((Result) results[0]).getValue(FAMILY, QUALIFIER)); + + assertEquals(1L, Bytes.toLong(((Result) results[1]).getValue(FAMILY, QUALIFIER))); + + assertEquals(1L, Bytes.toLong(((Result) results[2]).getValue(FAMILY, QUALIFIER))); + assertArrayEquals(VALUE, ((Result) results[2]).getValue(FAMILY, QUALIFIER2)); + + CheckAndMutateResult result; + + result = (CheckAndMutateResult) results[3]; + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + + result = (CheckAndMutateResult) results[4]; + assertTrue(result.isSuccess()); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER)); + + result = (CheckAndMutateResult) results[5]; + assertTrue(result.isSuccess()); + assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER))); + assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2)); + } +}