HBASE-25678 Support nonce operations for Increment/Append in RowMutations and CheckAndMutate (#3064)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Toshihiro Suzuki 2021-03-19 21:25:40 +09:00 committed by GitHub
parent cc6c14a495
commit f4059907e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 438 additions and 144 deletions

View File

@ -172,7 +172,7 @@ class AsyncBatchRpcRetryingCaller<T> {
} else {
action = new Action(rawAction, i);
}
if (rawAction instanceof Append || rawAction instanceof Increment) {
if (hasIncrementOrAppend(rawAction)) {
action.setNonce(conn.getNonceGenerator().newNonce());
}
this.actions.add(action);
@ -184,6 +184,26 @@ class AsyncBatchRpcRetryingCaller<T> {
this.startNs = System.nanoTime();
}
private static boolean hasIncrementOrAppend(Row action) {
if (action instanceof Append || action instanceof Increment) {
return true;
} else if (action instanceof RowMutations) {
return hasIncrementOrAppend((RowMutations) action);
} else if (action instanceof CheckAndMutate) {
return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
}
return false;
}
private static boolean hasIncrementOrAppend(RowMutations mutations) {
for (Mutation mutation : mutations.getMutations()) {
if (mutation instanceof Append || mutation instanceof Increment) {
return true;
}
}
return false;
}
private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
/**
* The implementation of RawAsyncTable.
@ -324,7 +323,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, p),
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
@ -336,7 +335,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, d),
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
@ -349,8 +348,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm), CheckAndMutateResult::isSuccess))
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call();
}
}
@ -387,7 +387,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, p),
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
@ -398,7 +398,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, d),
filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
@ -410,8 +410,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, rm), CheckAndMutateResult::isSuccess))
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null,
filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call();
}
}
@ -430,6 +431,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
}
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
mutation.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
@ -437,21 +440,23 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), m),
checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
rowMutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(
controller, loc, stub, rowMutations,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
(rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), rm),
checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
resp -> resp))
.call();
} else {
@ -516,16 +521,13 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
writeRpcTimeoutNs).action((controller, loc, stub) ->
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
(rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter
.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
.build();
}, resp -> resp))
(rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
resp -> resp))
.call();
}

View File

@ -203,39 +203,78 @@ public final class RequestConverter {
*/
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
final Filter filter, final TimeRange timeRange, final Mutation mutation, long nonceGroup,
long nonce) throws IOException {
MutateRequest.Builder builder = MutateRequest.newBuilder();
if (mutation instanceof Increment || mutation instanceof Append) {
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation, nonce))
.setNonceGroup(nonceGroup);
} else {
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation));
}
return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
/**
* Create a protocol buffer MutateRequest for conditioned row mutations
* Create a protocol buffer MultiRequest for conditioned row mutations
*
* @return a mutate request
* @return a multi request
* @throws IOException
*/
public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
final RowMutations rowMutations) throws IOException {
final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op,
value, filter, timeRange), nonceGroup, nonce);
}
/**
* Create a protocol buffer MultiRequest for row mutations
*
* @return a multi request
*/
public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
return buildMultiRequest(regionName, rowMutations, null, nonceGroup, nonce);
}
private static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
final RowMutations rowMutations, final Condition condition, long nonceGroup, long nonce)
throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
builder.setAtomic(true);
boolean hasNonce = false;
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
MutationProto mp;
if (mutation instanceof Increment || mutation instanceof Append) {
mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder, nonce);
hasNonce = true;
} else {
mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder);
}
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition(
buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build();
if (condition != null) {
builder.setCondition(condition);
}
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
if (hasNonce) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
return multiRequestBuilder.addRegionAction(builder.build()).build();
}
/**
@ -318,33 +357,6 @@ public final class RequestConverter {
return builder.build();
}
/**
* Create a protocol buffer MultiRequest for row mutations.
* Does not propagate Action absolute position. Does not set atomic action on the created
* RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
* @return a data-laden RegionMutation.Builder
* @throws IOException
*/
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
final RowMutations rowMutations)
throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return builder;
}
public static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
@ -562,9 +574,6 @@ public final class RequestConverter {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
if (builder.getActionCount() > 0) {
multiRequestBuilder.addRegionAction(builder.build());
}
@ -578,8 +587,11 @@ public final class RequestConverter {
builder.clear();
getRegionActionBuilderWithRegion(builder, regionName);
buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder,
mutationBuilder);
boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) action.getAction(),
cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
if (hasIncrementOrAppend) {
hasNonce = true;
}
builder.setAtomic(true);
multiRequestBuilder.addRegionAction(builder.build());
@ -613,16 +625,21 @@ public final class RequestConverter {
} else if (cam.getAction() instanceof Increment) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
buildNoDataRegionAction((Increment) cam.getAction(), cells, action.getNonce(), builder,
actionBuilder, mutationBuilder);
hasNonce = true;
} else if (cam.getAction() instanceof Append) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
buildNoDataRegionAction((Append) cam.getAction(), cells, action.getNonce(), builder,
actionBuilder, mutationBuilder);
hasNonce = true;
} else if (cam.getAction() instanceof RowMutations) {
buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) cam.getAction(),
cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
if (hasIncrementOrAppend) {
hasNonce = true;
}
builder.setAtomic(true);
} else {
throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
@ -635,6 +652,10 @@ public final class RequestConverter {
// in the overall multiRequest.
indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
}
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
}
private static void buildNoDataRegionAction(final Put put, final List<CellScannable> cells,
@ -684,18 +705,29 @@ public final class RequestConverter {
MutationType.APPEND, append, mutationBuilder, nonce)));
}
private static void buildNoDataRegionAction(final RowMutations rowMutations,
final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
/**
* @return whether or not the rowMutations has a Increment or Append
*/
private static boolean buildNoDataRegionAction(final RowMutations rowMutations,
final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
boolean ret = false;
for (Mutation mutation: rowMutations.getMutations()) {
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
mutationBuilder);
MutationProto mp;
if (mutation instanceof Increment || mutation instanceof Append) {
mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder,
nonce);
ret = true;
} else {
mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder);
}
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
return ret;
}
private static MutationType getMutationType(Mutation mutation) {

View File

@ -1551,7 +1551,7 @@ public interface RegionObserver {
List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
for (Pair<Cell, Cell> pair : cellPairs) {
resultPairs.add(new Pair<>(pair.getFirst(),
postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
postMutationBeforeWAL(ctx, MutationType.APPEND, mutation, pair.getFirst(),
pair.getSecond())));
}
return resultPairs;

View File

@ -3297,8 +3297,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected final Map<byte[], List<Cell>>[] familyCellMaps;
// For Increment/Append operations
protected final Result[] results;
// For nonce operations
protected final boolean[] canProceed;
protected final HRegion region;
protected int nextIndexToProcess = 0;
@ -3314,7 +3312,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.walEditsFromCoprocessors = new WALEdit[operations.length];
familyCellMaps = new Map[operations.length];
this.results = new Result[operations.length];
this.canProceed = new boolean[operations.length];
this.region = region;
observedExceptions = new ObservedExceptionsInBatch();
@ -3736,9 +3733,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
private static class MutationBatchOperation extends BatchOperation<Mutation> {
// For nonce operations
private long nonceGroup;
private long nonce;
protected boolean canProceed;
public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
long nonceGroup, long nonce) {
@ -3851,6 +3849,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
// For nonce operations
canProceed = startNonceOperation();
visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
Mutation mutation = getMutation(index);
if (mutation instanceof Put) {
@ -3869,8 +3870,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// For nonce operations
canProceed[index] = startNonceOperation(nonceGroup, nonce);
if (!canProceed[index]) {
if (!canProceed) {
Result result;
if (returnResults) {
// convert duplicate increment/append to get
@ -3932,11 +3932,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Starts the nonce operation for a mutation, if needed.
* @param nonceGroup Nonce group from the request.
* @param nonce Nonce.
* @return whether to proceed this mutation.
*/
private boolean startNonceOperation(long nonceGroup, long nonce) throws IOException {
private boolean startNonceOperation() throws IOException {
if (region.rsServices == null || region.rsServices.getNonceManager() == null
|| nonce == HConstants.NO_NONCE) {
return true;
@ -3953,11 +3951,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Ends nonce operation for a mutation, if needed.
* @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
* @param nonce Nonce.
* @param success Whether the operation for this nonce has succeeded.
*/
private void endNonceOperation(long nonceGroup, long nonce, boolean success) {
private void endNonceOperation(boolean success) {
if (region.rsServices != null && region.rsServices.getNonceManager() != null
&& nonce != HConstants.NO_NONCE) {
region.rsServices.getNonceManager().endOperation(nonceGroup, nonce, success);
@ -4226,13 +4222,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// For nonce operations
visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
if (canProceed[i]) {
endNonceOperation(nonceGroup, nonce,
retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS);
}
return true;
});
if (canProceed && nonce != HConstants.NO_NONCE) {
boolean[] areAllIncrementsAndAppendsSuccessful = new boolean[]{true};
visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
Mutation mutation = getMutation(i);
if (mutation instanceof Increment || mutation instanceof Append) {
if (retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
areAllIncrementsAndAppendsSuccessful[0] = false;
return false;
}
}
return true;
});
endNonceOperation(areAllIncrementsAndAppendsSuccessful[0]);
}
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
@ -4490,7 +4493,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
private OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
long nonce) throws IOException {
// As it stands, this is used for 3 things
// * batchMutate with single mutation - put/delete/increment/append, separate or from
@ -4786,6 +4789,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
return checkAndMutate(checkAndMutate, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, long nonceGroup,
long nonce) throws IOException {
byte[] row = checkAndMutate.getRow();
Filter filter = null;
byte[] family = null;
@ -4897,9 +4905,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// All edits for the given row (across all column families) must happen atomically.
Result r;
if (mutation != null) {
r = mutate(mutation, true).getResult();
r = mutate(mutation, true, nonceGroup, nonce).getResult();
} else {
r = mutateRow(rowMutations);
r = mutateRow(rowMutations, nonceGroup, nonce);
}
this.checkAndMutateChecksPassed.increment();
return new CheckAndMutateResult(true, r);
@ -7573,9 +7581,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public Result mutateRow(RowMutations rm) throws IOException {
return mutateRow(rm, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
public Result mutateRow(RowMutations rm, long nonceGroup, long nonce) throws IOException {
final List<Mutation> m = rm.getMutations();
OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true,
HConstants.NO_NONCE, HConstants.NO_NONCE);
OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true, nonceGroup, nonce);
List<Result> results = new ArrayList<>();
for (OperationStatus status : statuses) {

View File

@ -602,43 +602,47 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
CellScanner cellScanner, Condition condition, long nonceGroup,
ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
int countOfCompleteMutation = 0;
try {
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
List<Mutation> mutations = new ArrayList<>();
long nonce = HConstants.NO_NONCE;
for (ClientProtos.Action action: actions) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
}
MutationType type = action.getMutation().getMutateType();
MutationProto mutation = action.getMutation();
MutationType type = mutation.getMutateType();
switch (type) {
case PUT:
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
Put put = ProtobufUtil.toPut(mutation, cellScanner);
++countOfCompleteMutation;
checkCellSizeLimit(region, put);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
mutations.add(put);
break;
case DELETE:
Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
++countOfCompleteMutation;
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
mutations.add(del);
break;
case INCREMENT:
Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
++countOfCompleteMutation;
checkCellSizeLimit(region, increment);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
mutations.add(increment);
break;
case APPEND:
Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner);
Append append = ProtobufUtil.toAppend(mutation, cellScanner);
nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
++countOfCompleteMutation;
checkCellSizeLimit(region, append);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
@ -658,7 +662,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
}
if (result == null) {
result = region.checkAndMutate(checkAndMutate);
result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
}
@ -913,21 +917,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
// Just throw the exception. The exception will be caught and then added to region-level
// exception for RegionAction. Leaving the null to action result is ok since the null
// result is viewed as failure by hbase client. And the region-lever exception will be used
// to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
// AsyncBatchRpcRetryingCaller#onComplete for more details.
doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
}
private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
try {
doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
spaceQuotaEnforcement, false);
} catch (IOException e) {
// Set the exception for each action. The mutations in same RegionAction are group to
// different batch and then be processed individually. Hence, we don't set the region-level
@ -946,9 +951,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param mutations
*/
private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
throws IOException {
final OperationQuota quota, final List<ClientProtos.Action> mutations,
final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement,
boolean atomic) throws IOException {
Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
@ -962,6 +967,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
int i = 0;
long nonce = HConstants.NO_NONCE;
for (ClientProtos.Action action: mutations) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
@ -982,10 +988,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
case INCREMENT:
mutation = ProtobufUtil.toIncrement(m, cells);
nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
break;
case APPEND:
mutation = ProtobufUtil.toAppend(m, cells);
nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
break;
default:
@ -1010,7 +1018,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
}
OperationStatus[] codes = region.batchMutate(mArray, atomic);
OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
// When atomic is true, it indicates that the mutateRow API or the batch API with
// RowMutations is called. In this case, we need to merge the results of the
@ -2810,7 +2818,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
cellScanner, request.getCondition(), spaceQuotaEnforcement);
cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
responseBuilder.setProcessed(result.isSuccess());
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
@ -2878,7 +2886,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (regionAction.getActionCount() == 1) {
CheckAndMutateResult result = checkAndMutate(region, quota,
regionAction.getAction(0).getMutation(), cellScanner,
regionAction.getCondition(), spaceQuotaEnforcement);
regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(result.isSuccess());
resultOrExceptionOrBuilder.setIndex(0);
if (result.getResult() != null) {
@ -2887,7 +2895,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
} else {
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(result.isSuccess());
for (int i = 0; i < regionAction.getActionCount(); i++) {
if (i == 0 && result.getResult() != null) {
@ -2920,7 +2928,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
try {
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
cellScanner, spaceQuotaEnforcement);
cellScanner, nonceGroup, spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(true);
// We no longer use MultiResponse#processed. Instead, we use
// RegionActionResult#processed. This is for backward compatibility for old clients.
@ -3042,7 +3050,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCondition()) {
CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
request.getCondition(), spaceQuotaEnforcement);
request.getCondition(), nonceGroup, spaceQuotaEnforcement);
builder.setProcessed(result.isSuccess());
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
@ -3126,11 +3134,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
MutationProto mutation, CellScanner cellScanner, Condition condition,
MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
ActivePolicyEnforcement spaceQuota) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
cellScanner);
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
quota.addMutation((Mutation) checkAndMutate.getAction());
@ -3140,7 +3149,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
}
if (result == null) {
result = region.checkAndMutate(checkAndMutate);
result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
}

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -57,13 +60,17 @@ public class TestAsyncTableNoncedRetry {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static final TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static final byte[] QUALIFIER = Bytes.toBytes("cq");
private static byte[] VALUE = Bytes.toBytes("value");
private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
private static final byte[] VALUE = Bytes.toBytes("value");
private static AsyncConnection ASYNC_CONN;
@ -72,9 +79,14 @@ public class TestAsyncTableNoncedRetry {
private byte[] row;
private static AtomicInteger CALLED = new AtomicInteger();
private static final AtomicInteger CALLED = new AtomicInteger();
private static long SLEEP_TIME = 2000;
private static final long SLEEP_TIME = 2000;
private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
// The number of miniBatchOperations that are executed in a RegionServer
private static int miniBatchOperationCount;
public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
@ -84,21 +96,12 @@ public class TestAsyncTableNoncedRetry {
}
@Override
public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
Result result) throws IOException {
if (CALLED.getAndIncrement() == 0) {
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) {
// We sleep when the last of the miniBatchOperation is executed
if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
Threads.sleepWithoutInterrupt(SLEEP_TIME);
}
return RegionObserver.super.postAppend(c, append, result);
}
@Override
public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c,
Increment increment, Result result) throws IOException {
if (CALLED.getAndIncrement() == 0) {
Threads.sleepWithoutInterrupt(SLEEP_TIME);
}
return RegionObserver.super.postIncrement(c, increment, result);
}
}
@ -129,8 +132,11 @@ public class TestAsyncTableNoncedRetry {
public void testAppend() throws InterruptedException, ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
@ -141,9 +147,12 @@ public class TestAsyncTableNoncedRetry {
ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)
.setReturnResults(false)).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isEmpty());
@ -153,10 +162,14 @@ public class TestAsyncTableNoncedRetry {
public void testIncrement() throws InterruptedException, ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertEquals(1L, result);
}
@Test
@ -164,11 +177,218 @@ public class TestAsyncTableNoncedRetry {
ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)
.setReturnResults(false)).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isEmpty());
}
@Test
public void testIncrementInRowMutations()
throws InterruptedException, ExecutionException, IOException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
Result result = table.mutateRow(new RowMutations(row)
.add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
.add(new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
}
@Test
public void testAppendInRowMutations()
throws InterruptedException, ExecutionException, IOException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
Result result = table.mutateRow(new RowMutations(row)
.add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
.add(new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
}
@Test
public void testIncrementAndAppendInRowMutations()
throws InterruptedException, ExecutionException, IOException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
Result result = table.mutateRow(new RowMutations(row)
.add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
.add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
}
@Test
public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER2)
.build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isSuccess());
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
}
@Test
public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER2)
.build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isSuccess());
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
}
@Test
public void testIncrementInRowMutationsInCheckAndMutate() throws InterruptedException,
ExecutionException, IOException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER3)
.build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
.add(new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isSuccess());
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
}
@Test
public void testAppendInRowMutationsInCheckAndMutate() throws InterruptedException,
ExecutionException, IOException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER3)
.build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
.add(new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isSuccess());
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
}
@Test
public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws InterruptedException,
ExecutionException, IOException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 1;
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER3)
.build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
.add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)))).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isSuccess());
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
}
@Test
public void testBatch() throws InterruptedException,
ExecutionException, IOException {
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
miniBatchOperationCount = 6;
List<Object> results = table.batchAll(Arrays.asList(
new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
new RowMutations(row3)
.add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
.add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
CheckAndMutate.newBuilder(row4)
.ifNotExists(FAMILY, QUALIFIER2)
.build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
CheckAndMutate.newBuilder(row5)
.ifNotExists(FAMILY, QUALIFIER2)
.build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
CheckAndMutate.newBuilder(row6)
.ifNotExists(FAMILY, QUALIFIER3)
.build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
.add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE))))).get();
// make sure we called twice and the result is still correct
// should be called 12 times as 6 miniBatchOperations are called twice
assertEquals(12, CALLED.get());
assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER));
assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER)));
assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER)));
assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2));
CheckAndMutateResult result;
result = (CheckAndMutateResult) results.get(3);
assertTrue(result.isSuccess());
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
result = (CheckAndMutateResult) results.get(4);
assertTrue(result.isSuccess());
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
result = (CheckAndMutateResult) results.get(5);
assertTrue(result.isSuccess());
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
}
}