HBASE-24602 Add Increment and Append support to CheckAndMutate (#2228)

This commit is contained in:
Toshihiro Suzuki 2020-09-08 15:05:47 +09:00 committed by GitHub
parent a589e55a7b
commit d48c732851
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1256 additions and 548 deletions

View File

@ -33,8 +33,7 @@ import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete}
* and {@link RowMutations} are supported.
* Used to perform CheckAndMutate operations.
* <p>
* Use the builder class to instantiate a CheckAndMutate object.
* This builder class is fluent style APIs, the code are like:
@ -139,9 +138,9 @@ public final class CheckAndMutate extends Mutation {
}
private void preCheck(Row action) {
Preconditions.checkNotNull(action, "action (Put/Delete/RowMutations) is null");
Preconditions.checkNotNull(action, "action is null");
if (!Bytes.equals(row, action.getRow())) {
throw new IllegalArgumentException("The row of the action (Put/Delete/RowMutations) <" +
throw new IllegalArgumentException("The row of the action <" +
Bytes.toStringBinary(action.getRow()) + "> doesn't match the original one <" +
Bytes.toStringBinary(this.row) + ">");
}
@ -176,6 +175,32 @@ public final class CheckAndMutate extends Mutation {
}
}
/**
* @param increment data to increment if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(Increment increment) {
preCheck(increment);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, increment);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, increment);
}
}
/**
* @param append data to append if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(Append append) {
preCheck(append);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, append);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, append);
}
}
/**
* @param mutation mutations to perform if check succeeds
* @return a CheckAndMutate object

View File

@ -423,8 +423,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
}
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete) {
if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
|| checkAndMutate.getAction() instanceof Increment
|| checkAndMutate.getAction() instanceof Append) {
Mutation mutation = (Mutation) checkAndMutate.getAction();
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
@ -437,7 +438,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), m),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r)))
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();

View File

@ -3578,6 +3578,10 @@ public final class ProtobufUtil {
return builder.build(ProtobufUtil.toPut(mutation, cellScanner));
case DELETE:
return builder.build(ProtobufUtil.toDelete(mutation, cellScanner));
case INCREMENT:
return builder.build(ProtobufUtil.toIncrement(mutation, cellScanner));
case APPEND:
return builder.build(ProtobufUtil.toAppend(mutation, cellScanner));
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}

View File

@ -196,7 +196,7 @@ public final class RequestConverter {
}
/**
* Create a protocol buffer MutateRequest for a conditioned put/delete
* Create a protocol buffer MutateRequest for a conditioned put/delete/increment/append
*
* @return a mutate request
* @throws IOException
@ -204,15 +204,9 @@ public final class RequestConverter {
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else {
type = MutationType.DELETE;
}
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setMutation(ProtobufUtil.toMutation(type, mutation))
.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
@ -553,16 +547,12 @@ public final class RequestConverter {
} else if (row instanceof Delete) {
buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, mutationBuilder);
} else if (row instanceof Append) {
Append a = (Append)row;
cells.add(a);
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.APPEND, a, mutationBuilder, action.getNonce())));
buildNoDataRegionAction((Append) row, cells, action.getNonce(), builder, actionBuilder,
mutationBuilder);
hasNonce = true;
} else if (row instanceof Increment) {
Increment i = (Increment)row;
cells.add(i);
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
buildNoDataRegionAction((Increment) row, cells, action.getNonce(), builder, actionBuilder,
mutationBuilder);
hasNonce = true;
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
@ -636,6 +626,16 @@ public final class RequestConverter {
mutationBuilder.clear();
buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
} else if (cam.getAction() instanceof Increment) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
actionBuilder, mutationBuilder);
} else if (cam.getAction() instanceof Append) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
actionBuilder, mutationBuilder);
} else if (cam.getAction() instanceof RowMutations) {
buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
@ -682,6 +682,24 @@ public final class RequestConverter {
}
}
private static void buildNoDataRegionAction(final Increment increment,
final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
cells.add(increment);
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.INCREMENT, increment, mutationBuilder, nonce)));
}
private static void buildNoDataRegionAction(final Append append,
final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
cells.add(append);
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.APPEND, append, mutationBuilder, nonce)));
}
private static void buildNoDataRegionAction(final RowMutations rowMutations,
final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
@ -704,6 +722,19 @@ public final class RequestConverter {
}
}
private static MutationType getMutationType(Mutation mutation) {
assert !(mutation instanceof CheckAndMutate);
if (mutation instanceof Put) {
return MutationType.PUT;
} else if (mutation instanceof Delete) {
return MutationType.DELETE;
} else if (mutation instanceof Increment) {
return MutationType.INCREMENT;
} else {
return MutationType.APPEND;
}
}
// End utilities for Client
//Start utilities for Admin

View File

@ -226,11 +226,11 @@ public final class ResponseConverter {
* @return a CheckAndMutateResult object
*/
public static CheckAndMutateResult getCheckAndMutateResult(
ClientProtos.MutateResponse mutateResponse) {
ClientProtos.MutateResponse mutateResponse, CellScanner cells) throws IOException {
boolean success = mutateResponse.getProcessed();
Result result = null;
if (mutateResponse.hasResult()) {
result = ProtobufUtil.toResult(mutateResponse.getResult());
result = ProtobufUtil.toResult(mutateResponse.getResult(), cells);
}
return new CheckAndMutateResult(success, result);
}

View File

@ -441,8 +441,9 @@ public interface RegionObserver {
/**
* This will be called for every batch mutation operation happening at the server. This will be
* called after acquiring the locks on the mutating rows and after applying the proper timestamp
* for each Mutation at the server. The batch may contain Put/Delete. By setting OperationStatus
* of Mutations ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}),
* for each Mutation at the server. The batch may contain Put/Delete/Increment/Append. By
* setting OperationStatus of Mutations
* ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}),
* {@link RegionObserver} can make Region to skip these Mutations.
* <p>
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
@ -454,10 +455,12 @@ public interface RegionObserver {
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {}
/**
* This will be called after applying a batch of Mutations on a region. The Mutations are added to
* memstore and WAL. The difference of this one with
* {@link #postPut(ObserverContext, Put, WALEdit, Durability) }
* and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability) } is
* This will be called after applying a batch of Mutations on a region. The Mutations are added
* to memstore and WAL. The difference of this one with
* {@link #postPut(ObserverContext, Put, WALEdit, Durability)}
* and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)}
* and {@link #postIncrement(ObserverContext, Increment, Result)}
* and {@link #postAppend(ObserverContext, Append, Result)} is
* this hook will be executed before the mvcc transaction completion.
* <p>
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
@ -488,8 +491,8 @@ public interface RegionObserver {
Operation operation) throws IOException {}
/**
* Called after the completion of batch put/delete and will be called even if the batch operation
* fails.
* Called after the completion of batch put/delete/increment/append and will be called even if
* the batch operation fails.
* <p>
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.

View File

@ -45,6 +45,8 @@ public class MiniBatchOperationInProgress<T> {
private int cellCount = 0;
private int numOfPuts = 0;
private int numOfDeletes = 0;
private int numOfIncrements = 0;
private int numOfAppends = 0;
public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
@ -169,4 +171,20 @@ public class MiniBatchOperationInProgress<T> {
public void incrementNumOfDeletes() {
this.numOfDeletes += 1;
}
public int getNumOfIncrements() {
return numOfIncrements;
}
public void incrementNumOfIncrements() {
this.numOfIncrements += 1;
}
public int getNumOfAppends() {
return numOfAppends;
}
public void incrementNumOfAppends() {
this.numOfAppends += 1;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Result;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -43,21 +44,29 @@ public class OperationStatus {
public static final OperationStatus NOT_RUN = new OperationStatus(OperationStatusCode.NOT_RUN);
private final OperationStatusCode code;
private final Result result;
private final String exceptionMsg;
public OperationStatus(OperationStatusCode code) {
this(code, "");
this(code, null, "");
}
public OperationStatus(OperationStatusCode code, Result result) {
this(code, result, "");
}
public OperationStatus(OperationStatusCode code, String exceptionMsg) {
this.code = code;
this.exceptionMsg = exceptionMsg;
this(code, null, exceptionMsg);
}
public OperationStatus(OperationStatusCode code, Exception e) {
this(code, null, (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage());
}
private OperationStatus(OperationStatusCode code, Result result, String exceptionMsg) {
this.code = code;
this.exceptionMsg = (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage();
this.result = result;
this.exceptionMsg = exceptionMsg;
}
/**
@ -67,6 +76,13 @@ public class OperationStatus {
return code;
}
/**
* @return result
*/
public Result getResult() {
return result;
}
/**
* @return ExceptionMessge
*/

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
@ -553,38 +552,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
/**
* Starts the nonce operation for a mutation, if needed.
* @param mutation Mutation.
* @param nonceGroup Nonce group from the request.
* @return whether to proceed this mutation.
*/
private boolean startNonceOperation(final MutationProto mutation, long nonceGroup)
throws IOException {
if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
boolean canProceed = false;
try {
canProceed = regionServer.nonceManager.startOperation(
nonceGroup, mutation.getNonce(), regionServer);
} catch (InterruptedException ex) {
throw new InterruptedIOException("Nonce start operation interrupted");
}
return canProceed;
}
/**
* Ends nonce operation for a mutation, if needed.
* @param mutation Mutation.
* @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
* @param success Whether the operation for this nonce has succeeded.
*/
private void endNonceOperation(final MutationProto mutation,
long nonceGroup, boolean success) {
if (regionServer.nonceManager != null && mutation.hasNonce()) {
regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
}
}
private boolean isClientCellBlockSupport(RpcCallContext context) {
return context != null && context.isClientCellBlockSupported();
}
@ -622,7 +589,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
CellScanner cellScanner, Condition condition,ActivePolicyEnforcement spaceQuotaEnforcement)
CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
int countOfCompleteMutation = 0;
try {
@ -695,35 +662,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkCellSizeLimit(region, append);
spaceQuota.getPolicyEnforcement(region).check(append);
quota.addMutation(append);
Result r = null;
if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().preAppend(append);
}
if (r == null) {
boolean canProceed = startNonceOperation(mutation, nonceGroup);
boolean success = false;
try {
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
if (canProceed) {
r = region.append(append, nonceGroup, nonce);
} else {
// convert duplicate append to get
List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce);
r = Result.create(results);
}
success = true;
} finally {
if (canProceed) {
endNonceOperation(mutation, nonceGroup, success);
}
}
if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().postAppend(append, r);
}
}
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
Result r = region.append(append, nonceGroup, nonce);
if (regionServer.getMetrics() != null) {
regionServer.getMetrics().updateAppend(
region.getTableDescriptor().getTableName(),
regionServer.getMetrics().updateAppend(region.getTableDescriptor().getTableName(),
EnvironmentEdgeManager.currentTime() - before);
}
return r == null ? Result.EMPTY_RESULT : r;
@ -741,66 +683,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkCellSizeLimit(region, increment);
spaceQuota.getPolicyEnforcement(region).check(increment);
quota.addMutation(increment);
Result r = null;
if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().preIncrement(increment);
}
if (r == null) {
boolean canProceed = startNonceOperation(mutation, nonceGroup);
boolean success = false;
try {
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
if (canProceed) {
r = region.increment(increment, nonceGroup, nonce);
} else {
// convert duplicate increment to get
List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce);
r = Result.create(results);
}
success = true;
} finally {
if (canProceed) {
endNonceOperation(mutation, nonceGroup, success);
}
}
if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().postIncrement(increment, r);
}
}
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
Result r = region.increment(increment, nonceGroup, nonce);
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateIncrement(
region.getTableDescriptor().getTableName(),
EnvironmentEdgeManager.currentTime() - before);
metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(),
EnvironmentEdgeManager.currentTime() - before);
}
return r == null ? Result.EMPTY_RESULT : r;
}
private static Get toGet(final Mutation mutation) throws IOException {
if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
throw new AssertionError("mutation must be a instance of Increment or Append");
}
Get get = new Get(mutation.getRow());
CellScanner cellScanner = mutation.cellScanner();
while (!cellScanner.advance()) {
Cell cell = cellScanner.current();
get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
}
if (mutation instanceof Increment) {
// Increment
Increment increment = (Increment) mutation;
get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
} else {
// Append
Append append = (Append) mutation;
get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
}
for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
get.setAttribute(entry.getKey(), entry.getValue());
}
return get;
}
/**
* Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
* done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
@ -2910,23 +2802,35 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
try {
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(result.isSuccess());
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
for (int i = 0; i < regionAction.getActionCount(); i++) {
if (i == 0 && result.getResult() != null) {
resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
.setResult(ProtobufUtil.toResult(result.getResult())).build());
continue;
if (regionAction.getActionCount() == 1) {
CheckAndMutateResult result = checkAndMutate(region, quota,
regionAction.getAction(0).getMutation(), cellScanner,
regionAction.getCondition(), spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(result.isSuccess());
resultOrExceptionOrBuilder.setIndex(0);
if (result.getResult() != null) {
resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
}
// To unify the response format with doNonAtomicRegionMutation and read through
// client's AsyncProcess we have to add an empty result instance per operation
resultOrExceptionOrBuilder.clear();
resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
} else {
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(result.isSuccess());
for (int i = 0; i < regionAction.getActionCount(); i++) {
if (i == 0 && result.getResult() != null) {
resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
.setResult(ProtobufUtil.toResult(result.getResult())).build());
continue;
}
// To unify the response format with doNonAtomicRegionMutation and read through
// client's AsyncProcess we have to add an empty result instance per operation
resultOrExceptionOrBuilder.clear();
resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
}
}
} catch (IOException e) {
rpcServer.getMetrics().exception(e);

View File

@ -292,7 +292,8 @@ public interface Region extends ConfigurationObserver {
/**
* Perform a batch of mutations.
* <p>
* Note this supports only Put and Delete mutations and will ignore other types passed.
* Note this supports only Put, Delete, Increment and Append mutations and will ignore other
* types passed.
* @param mutations the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.

View File

@ -1506,8 +1506,11 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
if (m instanceof Put) {
checkForReservedTagPresence(user, m);
opType = OpType.PUT;
} else {
} else if (m instanceof Delete) {
opType = OpType.DELETE;
} else {
// If the operation type is not Put or Delete, do nothing
continue;
}
AuthResult authResult = null;
if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),

View File

@ -636,6 +636,7 @@ public class TestAsyncTable {
successCount.incrementAndGet();
successIndex.set(i);
}
assertNull(x.getResult());
latch.countDown();
}));
latch.await();
@ -670,6 +671,7 @@ public class TestAsyncTable {
successCount.incrementAndGet();
successIndex.set(i);
}
assertNull(x.getResult());
deleteLatch.countDown();
}));
deleteLatch.await();
@ -717,6 +719,7 @@ public class TestAsyncTable {
successCount.incrementAndGet();
successIndex.set(i);
}
assertNull(x.getResult());
mutateLatch.countDown();
});
});
@ -743,18 +746,21 @@ public class TestAsyncTable {
.ifNotExists(FAMILY, QUALIFIER)
.build(put)).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts + 10000))
.build(put)).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(put)).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
RowMutations rm = new RowMutations(row).add((Mutation) put);
@ -763,12 +769,14 @@ public class TestAsyncTable {
.timeRange(TimeRange.at(ts + 10000))
.build(rm)).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(rm)).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
@ -777,12 +785,14 @@ public class TestAsyncTable {
.timeRange(TimeRange.at(ts + 10000))
.build(delete)).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(delete)).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
}
@Test
@ -802,6 +812,7 @@ public class TestAsyncTable {
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@ -812,6 +823,7 @@ public class TestAsyncTable {
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
@ -821,6 +833,7 @@ public class TestAsyncTable {
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
@ -833,6 +846,7 @@ public class TestAsyncTable {
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@ -860,6 +874,7 @@ public class TestAsyncTable {
Bytes.toBytes("b"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@ -873,6 +888,7 @@ public class TestAsyncTable {
Bytes.toBytes("c"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
@ -885,6 +901,7 @@ public class TestAsyncTable {
Bytes.toBytes("b"))))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
@ -900,6 +917,7 @@ public class TestAsyncTable {
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@ -922,6 +940,7 @@ public class TestAsyncTable {
new TimestampsFilter(Collections.singletonList(100L))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@ -934,6 +953,7 @@ public class TestAsyncTable {
new TimestampsFilter(Collections.singletonList(101L))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@ -953,6 +973,7 @@ public class TestAsyncTable {
.timeRange(TimeRange.between(0, 101))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@ -965,10 +986,125 @@ public class TestAsyncTable {
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))))
.get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@Test
public void testCheckAndIncrement() throws Throwable {
AsyncTable<?> table = getTable.get();
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
// CheckAndIncrement with correct value
CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
assertTrue(res.isSuccess());
assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndIncrement with wrong value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
// CheckAndIncrement with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
assertTrue(res.isSuccess());
assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndIncrement with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("b")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("d"))))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
@Test
public void testCheckAndAppend() throws Throwable {
AsyncTable<?> table = getTable.get();
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
// CheckAndAppend with correct value
CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(res.isSuccess());
assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndAppend with correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
// CheckAndAppend with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
assertTrue(res.isSuccess());
assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndAppend with a filter and wrong value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("b")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("d"))))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
// Tests for batch version of checkAndMutate
@Test
@ -997,7 +1133,9 @@ public class TestAsyncTable {
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@ -1017,7 +1155,9 @@ public class TestAsyncTable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
@ -1042,7 +1182,9 @@ public class TestAsyncTable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(row3)).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
@ -1079,7 +1221,9 @@ public class TestAsyncTable {
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@ -1099,7 +1243,9 @@ public class TestAsyncTable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@ -1121,7 +1267,9 @@ public class TestAsyncTable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@ -1166,7 +1314,9 @@ public class TestAsyncTable {
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@ -1194,7 +1344,9 @@ public class TestAsyncTable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
@ -1227,7 +1379,9 @@ public class TestAsyncTable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
@ -1273,7 +1427,9 @@ public class TestAsyncTable {
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@ -1282,6 +1438,80 @@ public class TestAsyncTable {
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
@Test
public void testCheckAndIncrementBatch() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
// CheckAndIncrement with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
// CheckAndIncrement with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
.build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertEquals(1, Bytes.toLong(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("B"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testCheckAndAppendBatch() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
// CheckAndAppend with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
// CheckAndAppend with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
.build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertEquals("bb", Bytes.toString(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("B"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();

View File

@ -344,13 +344,17 @@ public class TestAsyncTableBatch {
byte[] row3 = Bytes.toBytes("row3");
byte[] row4 = Bytes.toBytes("row4");
byte[] row5 = Bytes.toBytes("row5");
byte[] row6 = Bytes.toBytes("row6");
byte[] row7 = Bytes.toBytes("row7");
table.putAll(Arrays.asList(
new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))).get();
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
@ -363,17 +367,36 @@ public class TestAsyncTableBatch {
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
.ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
.build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
.ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
.build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
checkAndMutate3, checkAndMutate4);
List<Object> results = table.batchAll(actions).get();
assertTrue(((CheckAndMutateResult) results.get(0)).isSuccess());
assertNull(((CheckAndMutateResult) results.get(0)).getResult());
assertEquals("b",
Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B"))));
assertTrue(((Result) results.get(2)).getExists());
assertFalse(((CheckAndMutateResult) results.get(3)).isSuccess());
assertNull(((CheckAndMutateResult) results.get(3)).getResult());
assertTrue(((Result) results.get(4)).isEmpty());
CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(5);
assertTrue(checkAndMutateResult.isSuccess());
assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
.getValue(FAMILY, Bytes.toBytes("F"))));
checkAndMutateResult = (CheckAndMutateResult) results.get(6);
assertTrue(checkAndMutateResult.isSuccess());
assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
.getValue(FAMILY, Bytes.toBytes("G"))));
Result result = table.get(new Get(row1)).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@ -386,5 +409,11 @@ public class TestAsyncTableBatch {
result = table.get(new Get(row5)).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
result = table.get(new Get(row6)).get();
assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
result = table.get(new Get(row7)).get();
assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
}
}

View File

@ -372,6 +372,7 @@ public class TestCheckAndMutate {
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(rm));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// get row back and assert the values
getOneRowAndAssertAllButCExist(table);
@ -405,6 +406,7 @@ public class TestCheckAndMutate {
Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@ -415,6 +417,7 @@ public class TestCheckAndMutate {
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
@ -424,6 +427,7 @@ public class TestCheckAndMutate {
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
assertTrue(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
@ -436,6 +440,7 @@ public class TestCheckAndMutate {
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
assertTrue(result.isSuccess());
assertNull(result.getResult());
r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@ -461,6 +466,7 @@ public class TestCheckAndMutate {
Bytes.toBytes("b"))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@ -474,6 +480,7 @@ public class TestCheckAndMutate {
Bytes.toBytes("c"))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
@ -486,6 +493,7 @@ public class TestCheckAndMutate {
Bytes.toBytes("b"))))
.build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
assertTrue(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
@ -501,6 +509,7 @@ public class TestCheckAndMutate {
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
assertTrue(result.isSuccess());
assertNull(result.getResult());
r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@ -523,6 +532,7 @@ public class TestCheckAndMutate {
new TimestampsFilter(Collections.singletonList(100L))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@ -535,6 +545,7 @@ public class TestCheckAndMutate {
new TimestampsFilter(Collections.singletonList(101L))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
}
@ -553,6 +564,7 @@ public class TestCheckAndMutate {
.timeRange(TimeRange.between(0, 101))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@ -564,6 +576,7 @@ public class TestCheckAndMutate {
.timeRange(TimeRange.between(0, 100))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
}
@ -575,6 +588,120 @@ public class TestCheckAndMutate {
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
@Test
public void testCheckAndIncrement() throws Throwable {
try (Table table = createTable()) {
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
// CheckAndIncrement with correct value
CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 1)));
assertTrue(res.isSuccess());
assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndIncrement with wrong value
res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
.build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 1)));
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
// CheckAndIncrement with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 2)));
assertTrue(res.isSuccess());
assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndIncrement with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("b")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("d"))))
.build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 2)));
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
}
@Test
public void testCheckAndAppend() throws Throwable {
try (Table table = createTable()) {
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
// CheckAndAppend with correct value
CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertTrue(res.isSuccess());
assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndAppend with correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
.build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
// CheckAndAppend with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb"))));
assertTrue(res.isSuccess());
assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndAppend with a filter and wrong value
res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("b")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("d"))))
.build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb"))));
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
}
// Tests for batch version of checkAndMutate
@Test
@ -599,7 +726,9 @@ public class TestCheckAndMutate {
table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@ -619,7 +748,9 @@ public class TestCheckAndMutate {
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
@ -644,7 +775,9 @@ public class TestCheckAndMutate {
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(ROWKEY3));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
@ -678,7 +811,9 @@ public class TestCheckAndMutate {
table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@ -698,7 +833,9 @@ public class TestCheckAndMutate {
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@ -720,7 +857,9 @@ public class TestCheckAndMutate {
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C")));
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@ -764,7 +903,9 @@ public class TestCheckAndMutate {
table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")));
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@ -792,7 +933,9 @@ public class TestCheckAndMutate {
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
@ -825,7 +968,9 @@ public class TestCheckAndMutate {
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(ROWKEY));
assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
@ -870,7 +1015,9 @@ public class TestCheckAndMutate {
table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")));
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@ -879,4 +1026,76 @@ public class TestCheckAndMutate {
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
}
@Test
public void testCheckAndIncrementBatch() throws Throwable {
try (Table table = createTable()) {
table.put(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L))));
// CheckAndIncrement with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 1));
// CheckAndIncrement with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
.build(new Increment(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
List<CheckAndMutateResult> results =
table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertEquals(1, Bytes.toLong(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("B"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
}
@Test
public void testCheckAndAppendBatch() throws Throwable {
try (Table table = createTable()) {
table.put(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
// CheckAndAppend with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
// CheckAndAppend with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
.build(new Append(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
List<CheckAndMutateResult> results =
table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertEquals("bb", Bytes.toString(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("B"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
}
}

View File

@ -460,13 +460,17 @@ public class TestFromClientSide3 {
byte[] row3 = Bytes.toBytes("row3");
byte[] row4 = Bytes.toBytes("row4");
byte[] row5 = Bytes.toBytes("row5");
byte[] row6 = Bytes.toBytes("row6");
byte[] row7 = Bytes.toBytes("row7");
table.put(Arrays.asList(
new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
@ -479,18 +483,37 @@ public class TestFromClientSide3 {
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
.ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
.build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
.ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
.build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
checkAndMutate3, checkAndMutate4);
Object[] results = new Object[actions.size()];
table.batch(actions, results);
assertTrue(((CheckAndMutateResult) results[0]).isSuccess());
assertNull(((CheckAndMutateResult) results[0]).getResult());
assertEquals("b",
Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
assertTrue(((Result) results[2]).getExists());
assertFalse(((CheckAndMutateResult) results[3]).isSuccess());
assertNull(((CheckAndMutateResult) results[3]).getResult());
assertTrue(((Result) results[4]).isEmpty());
CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[5];
assertTrue(checkAndMutateResult.isSuccess());
assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
.getValue(FAMILY, Bytes.toBytes("F"))));
checkAndMutateResult = (CheckAndMutateResult) results[6];
assertTrue(checkAndMutateResult.isSuccess());
assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
.getValue(FAMILY, Bytes.toBytes("G"))));
Result result = table.get(new Get(row1));
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@ -503,6 +526,12 @@ public class TestFromClientSide3 {
result = table.get(new Get(row5));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
result = table.get(new Get(row6));
assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
result = table.get(new Get(row7));
assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
}
}

View File

@ -656,7 +656,7 @@ public class TestAtomicOperation {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
puts[0] = put;
region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
region.batchMutate(puts);
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf);
ctx.addThread(new PutThread(ctx, region));

View File

@ -2322,8 +2322,8 @@ public class TestHRegion {
new Put(wrongRow).addColumn(fam1, qual1, value1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ "match the original one <rowA>", e.getMessage());
assertEquals("The row of the action <wrongRow> doesn't match the original one <rowA>",
e.getMessage());
}
try {
@ -2332,8 +2332,8 @@ public class TestHRegion {
new Put(wrongRow).addColumn(fam1, qual1, value1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ "match the original one <rowA>", e.getMessage());
assertEquals("The row of the action <wrongRow> doesn't match the original one <rowA>",
e.getMessage());
}
try {
@ -2345,8 +2345,8 @@ public class TestHRegion {
.add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ "match the original one <rowA>", e.getMessage());
assertEquals("The row of the action <wrongRow> doesn't match the original one <rowA>",
e.getMessage());
}
try {
@ -2358,8 +2358,8 @@ public class TestHRegion {
.add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ "match the original one <rowA>", e.getMessage());
assertEquals("The row of the action <wrongRow> doesn't match the original one <rowA>",
e.getMessage());
}
}
@ -2846,6 +2846,127 @@ public class TestHRegion {
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
}
@Test
public void testCheckAndIncrement() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
// Setting up region
this.region = initHRegion(tableName, method, CONF, FAMILY);
region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
// CheckAndIncrement with correct value
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)));
assertTrue(res.isSuccess());
assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndIncrement with wrong value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)));
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
// CheckAndIncrement with a filter and correct value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2)));
assertTrue(res.isSuccess());
assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndIncrement with a filter and correct value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("b")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("d"))))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2)));
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
@Test
public void testCheckAndAppend() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
// Setting up region
this.region = initHRegion(tableName, method, CONF, FAMILY);
region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
// CheckAndAppend with correct value
CheckAndMutateResult res =
region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertTrue(res.isSuccess());
assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndAppend with wrong value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
// CheckAndAppend with a filter and correct value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb"))));
assertTrue(res.isSuccess());
assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndAppend with a filter and wrong value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("b")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("d"))))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb"))));
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
// ////////////////////////////////////////////////////////////////////////////
// Delete tests
// ////////////////////////////////////////////////////////////////////////////