HBASE-8458 Support for batch version of checkAndMutate() (#1897)

Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
Toshihiro Suzuki 2020-06-14 13:55:06 +09:00 committed by GitHub
parent 2d57595f54
commit 6ff6fe49a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 2368 additions and 280 deletions

View File

@ -256,7 +256,7 @@ class AsyncBatchRpcRetryingCaller<T> {
}
private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
List<CellScannable> cells, Map<Integer, Integer> indexMap) throws IOException {
ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
@ -264,14 +264,14 @@ class AsyncBatchRpcRetryingCaller<T> {
for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the
// action list.
RequestConverter.buildNoDataRegionActions(entry.getKey(),
entry.getValue().actions.stream()
.sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
.collect(Collectors.toList()),
cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
rowMutationsIndexMap);
cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder,
nonceGroup, indexMap);
}
return multiRequestBuilder.build();
}
@ -367,10 +367,10 @@ class AsyncBatchRpcRetryingCaller<T> {
List<CellScannable> cells = new ArrayList<>();
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there
// is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
// is RowMutations/CheckAndMutate in the action list.
Map<Integer, Integer> indexMap = new HashMap<>();
try {
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
req = buildReq(serverReq.actionsByRegion, cells, indexMap);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
@ -387,7 +387,7 @@ class AsyncBatchRpcRetryingCaller<T> {
} else {
try {
onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req,
rowMutationsIndexMap, resp, controller.cellScanner()));
indexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;

View File

@ -231,12 +231,20 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* });
* </code>
* </pre>
*
* @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
/**
* A helper class for sending checkAndMutate request.
*
* @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
interface CheckAndMutateBuilder {
/**
@ -309,12 +317,20 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* });
* </code>
* </pre>
*
* @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
/**
* A helper class for sending checkAndMutate request with a filter.
*
* @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
interface CheckAndMutateWithFilterBuilder {
/**
@ -344,6 +360,37 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
}
/**
* checkAndMutate that atomically checks if a row matches the specified condition. If it does,
* it performs the specified action.
*
* @param checkAndMutate The CheckAndMutate object.
* @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
*/
CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate);
/**
* Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
* that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
* atomically (and thus, each may fail independently of others).
*
* @param checkAndMutates The list of CheckAndMutate.
* @return A list of {@link CompletableFuture}s that represent the result for each
* CheckAndMutate.
*/
List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates);
/**
* A simple version of batch checkAndMutate. It will fail if there are any failures.
*
* @param checkAndMutates The list of rows to apply.
* @return A {@link CompletableFuture} that wrapper the result boolean list.
*/
default CompletableFuture<List<Boolean>> checkAndMutateAll(
List<CheckAndMutate> checkAndMutates) {
return allOf(checkAndMutate(checkAndMutates));
}
/**
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.

View File

@ -204,6 +204,17 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
};
}
@Override
public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate) {
return wrap(rawTable.checkAndMutate(checkAndMutate));
}
@Override
public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
return rawTable.checkAndMutate(checkAndMutates).stream()
.map(this::wrap).collect(toList());
}
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));

View File

@ -0,0 +1,352 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete}
* and {@link RowMutations} are supported.
* <p>
* Use the builder class to instantiate a CheckAndMutate object.
* This builder class is fluent style APIs, the code are like:
* <pre>
* <code>
* // A CheckAndMutate operation where do the specified action if the column (specified by the
* // family and the qualifier) of the row equals to the specified value
* CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
* .ifEquals(family, qualifier, value)
* .build(put);
*
* // A CheckAndMutate operation where do the specified action if the column (specified by the
* // family and the qualifier) of the row doesn't exist
* CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
* .ifNotExists(family, qualifier)
* .build(put);
*
* // A CheckAndMutate operation where do the specified action if the row matches the filter
* CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
* .ifMatches(filter)
* .build(delete);
* </code>
* </pre>
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class CheckAndMutate extends Mutation {
/**
* A builder class for building a CheckAndMutate object.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public static final class Builder {
private final byte[] row;
private byte[] family;
private byte[] qualifier;
private CompareOperator op;
private byte[] value;
private Filter filter;
private TimeRange timeRange;
private Builder(byte[] row) {
this.row = Preconditions.checkNotNull(row, "row is null");
}
/**
* Check for lack of column
*
* @param family family to check
* @param qualifier qualifier to check
* @return the CheckAndMutate object
*/
public Builder ifNotExists(byte[] family, byte[] qualifier) {
return ifEquals(family, qualifier, null);
}
/**
* Check for equality
*
* @param family family to check
* @param qualifier qualifier to check
* @param value the expected value
* @return the CheckAndMutate object
*/
public Builder ifEquals(byte[] family, byte[] qualifier, byte[] value) {
return ifMatches(family, qualifier, CompareOperator.EQUAL, value);
}
/**
* @param family family to check
* @param qualifier qualifier to check
* @param compareOp comparison operator to use
* @param value the expected value
* @return the CheckAndMutate object
*/
public Builder ifMatches(byte[] family, byte[] qualifier, CompareOperator compareOp,
byte[] value) {
this.family = Preconditions.checkNotNull(family, "family is null");
this.qualifier = qualifier;
this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
this.value = value;
return this;
}
/**
* @param filter filter to check
* @return the CheckAndMutate object
*/
public Builder ifMatches(Filter filter) {
this.filter = Preconditions.checkNotNull(filter, "filter is null");
return this;
}
/**
* @param timeRange time range to check
* @return the CheckAndMutate object
*/
public Builder timeRange(TimeRange timeRange) {
this.timeRange = timeRange;
return this;
}
private void preCheck(Row action) {
Preconditions.checkNotNull(action, "action (Put/Delete/RowMutations) is null");
if (!Bytes.equals(row, action.getRow())) {
throw new IllegalArgumentException("The row of the action (Put/Delete/RowMutations) <" +
Bytes.toStringBinary(action.getRow()) + "> doesn't match the original one <" +
Bytes.toStringBinary(this.row) + ">");
}
Preconditions.checkState(op != null || filter != null, "condition is null. You need to"
+ " specify the condition by calling ifNotExists/ifEquals/ifMatches before building a"
+ " CheckAndMutate object");
}
/**
* @param put data to put if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(Put put) {
preCheck(put);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, put);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, put);
}
}
/**
* @param delete data to delete if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(Delete delete) {
preCheck(delete);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, delete);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, delete);
}
}
/**
* @param mutation mutations to perform if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(RowMutations mutation) {
preCheck(mutation);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, mutation);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation);
}
}
}
/**
* returns a builder object to build a CheckAndMutate object
*
* @param row row
* @return a builder object
*/
public static Builder newBuilder(byte[] row) {
return new Builder(row);
}
private final byte[] family;
private final byte[] qualifier;
private final CompareOperator op;
private final byte[] value;
private final Filter filter;
private final TimeRange timeRange;
private final Row action;
private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op,
byte[] value, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
this.family = family;
this.qualifier = qualifier;
this.op = op;
this.value = value;
this.filter = null;
this.timeRange = timeRange;
this.action = action;
}
private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
this.family = null;
this.qualifier = null;
this.op = null;
this.value = null;
this.filter = filter;
this.timeRange = timeRange;
this.action = action;
}
/**
* @return the family to check
*/
public byte[] getFamily() {
return family;
}
/**
* @return the qualifier to check
*/
public byte[] getQualifier() {
return qualifier;
}
/**
* @return the comparison operator
*/
public CompareOperator getCompareOp() {
return op;
}
/**
* @return the expected value
*/
public byte[] getValue() {
return value;
}
/**
* @return the filter to check
*/
public Filter getFilter() {
return filter;
}
/**
* @return the time range to check
*/
public TimeRange getTimeRange() {
return timeRange;
}
/**
* @return the action done if check succeeds
*/
public Row getAction() {
return action;
}
@Override
public NavigableMap<byte[], List<Cell>> getFamilyCellMap() {
if (action instanceof Mutation) {
return ((Mutation) action).getFamilyCellMap();
}
throw new UnsupportedOperationException();
}
@Override
public long getTimestamp() {
if (action instanceof Mutation) {
return ((Mutation) action).getTimestamp();
}
throw new UnsupportedOperationException();
}
@Override
public Mutation setTimestamp(long timestamp) {
if (action instanceof Mutation) {
return ((Mutation) action).setTimestamp(timestamp);
}
throw new UnsupportedOperationException();
}
@Override
public Durability getDurability() {
if (action instanceof Mutation) {
return ((Mutation) action).getDurability();
}
throw new UnsupportedOperationException();
}
@Override
public Mutation setDurability(Durability d) {
if (action instanceof Mutation) {
return ((Mutation) action).setDurability(d);
}
throw new UnsupportedOperationException();
}
@Override
public byte[] getAttribute(String name) {
if (action instanceof Mutation) {
return ((Mutation) action).getAttribute(name);
}
throw new UnsupportedOperationException();
}
@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
if (action instanceof Mutation) {
return ((Mutation) action).setAttribute(name, value);
}
throw new UnsupportedOperationException();
}
@Override
public int getPriority() {
if (action instanceof Mutation) {
return ((Mutation) action).getPriority();
}
return ((RowMutations) action).getMaxPriority();
}
@Override
public OperationWithAttributes setPriority(int priority) {
if (action instanceof Mutation) {
return ((Mutation) action).setPriority(priority);
}
throw new UnsupportedOperationException();
}
}

View File

@ -772,11 +772,13 @@ public class HTable implements Table {
}
@Override
@Deprecated
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new CheckAndMutateBuilderImpl(row, family);
}
@Override
@Deprecated
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return new CheckAndMutateWithFilterBuilderImpl(row, filter);
}
@ -849,6 +851,52 @@ public class HTable implements Table {
return doCheckAndMutate(row, family, qualifier, op, value, null, null, rm);
}
@Override
public boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
Row action = checkAndMutate.getAction();
if (action instanceof Put) {
Put put = (Put) action;
validatePut(put);
return doCheckAndPut(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), put);
} else if (action instanceof Delete) {
return doCheckAndDelete(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Delete) action);
} else {
return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action);
}
}
@Override
public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) throws IOException {
if (checkAndMutates.isEmpty()) {
return new boolean[]{};
}
if (checkAndMutates.size() == 1) {
return new boolean[]{ checkAndMutate(checkAndMutates.get(0)) };
}
Object[] results = new Object[checkAndMutates.size()];
try {
batch(checkAndMutates, results, writeRpcTimeoutMs);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
// translate.
boolean[] ret = new boolean[results.length];
int i = 0;
for (Object r : results) {
// Batch ensures if there is a failure we get an exception instead
ret[i++] = ((Result) r).getExists();
}
return ret;
}
private CompareOperator toCompareOperator(CompareOp compareOp) {
switch (compareOp) {
case LESS:

View File

@ -99,10 +99,10 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
long nonceGroup = multiAction.getNonceGroup();
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there
// is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
// Map from a created RegionAction to the original index for a RowMutations/CheckAndMutate
// within the original list of actions. This will be used to process the results when there
// is RowMutations/CheckAndMutate in the action list.
Map<Integer, Integer> indexMap = new HashMap<>();
// The multi object is a list of Actions by region. Iterate by region.
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey();
@ -110,17 +110,16 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
if (this.cellBlock) {
// Send data in cellblocks.
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// action list.
// indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in
// the action list.
RequestConverter.buildNoDataRegionActions(regionName, actions, cells, multiRequestBuilder,
regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
}
else {
regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, indexMap);
} else {
// multiRequestBuilder will be populated with region actions.
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
// action list.
// indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in
// the action list.
RequestConverter.buildRegionActions(regionName, actions, multiRequestBuilder,
regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, indexMap);
}
}
@ -130,7 +129,7 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto);
if (responseProto == null) return null; // Occurs on cancel
return ResponseConverter.getResults(requestProto, rowMutationsIndexMap, responseProto,
return ResponseConverter.getResults(requestProto, indexMap, responseProto,
getRpcControllerCellScanner());
}

View File

@ -149,10 +149,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
* @return a list of Cell objects, returns an empty list if one doesn't exist.
*/
List<Cell> getCellList(byte[] family) {
List<Cell> list = this.familyMap.get(family);
List<Cell> list = getFamilyCellMap().get(family);
if (list == null) {
list = new ArrayList<>();
this.familyMap.put(family, list);
getFamilyCellMap().put(family, list);
}
return list;
}
@ -201,11 +201,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
@Override
public Map<String, Object> getFingerprint() {
Map<String, Object> map = new HashMap<>();
List<String> families = new ArrayList<>(this.familyMap.entrySet().size());
List<String> families = new ArrayList<>(getFamilyCellMap().entrySet().size());
// ideally, we would also include table information, but that information
// is not stored in each Operation instance.
map.put("families", families);
for (Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
for (Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) {
families.add(Bytes.toStringBinary(entry.getKey()));
}
return map;
@ -229,7 +229,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
map.put("row", Bytes.toStringBinary(this.row));
int colCount = 0;
// iterate through all column families affected
for (Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
for (Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) {
// map from this family to details for each cell affected within the family
List<Map<String, Object>> qualifierDetails = new ArrayList<>();
columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
@ -319,7 +319,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
* @return true if empty, false otherwise
*/
public boolean isEmpty() {
return familyMap.isEmpty();
return getFamilyCellMap().isEmpty();
}
/**
@ -461,7 +461,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
public int size() {
int size = 0;
for (List<Cell> cells : this.familyMap.values()) {
for (List<Cell> cells : getFamilyCellMap().values()) {
size += cells.size();
}
return size;
@ -471,7 +471,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
* @return the number of different families
*/
public int numFamilies() {
return familyMap.size();
return getFamilyCellMap().size();
}
/**
@ -485,8 +485,8 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
// Adding map overhead
heapsize +=
ClassSize.align(this.familyMap.size() * ClassSize.MAP_ENTRY);
for(Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
ClassSize.align(getFamilyCellMap().size() * ClassSize.MAP_ENTRY);
for(Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) {
//Adding key overhead
heapsize +=
ClassSize.align(ClassSize.ARRAY + entry.getKey().length);

View File

@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@ -396,7 +397,6 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return new CheckAndMutateBuilderImpl(row, family);
}
private final class CheckAndMutateWithFilterBuilderImpl
implements CheckAndMutateWithFilterBuilder {
@ -458,6 +458,54 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return new CheckAndMutateWithFilterBuilderImpl(row, filter);
}
@Override
public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate) {
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
}
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete) {
Mutation mutation = (Mutation) checkAndMutate.getAction();
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
}
return RawAsyncTableImpl.this.<Boolean> newCaller(checkAndMutate.getRow(),
mutation.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, mutation,
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), m),
(c, r) -> r.getProcessed()))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
return RawAsyncTableImpl.this.<Boolean> newCaller(checkAndMutate.getRow(),
rowMutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, rowMutations,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), rm),
resp -> resp.getExists()))
.call();
} else {
CompletableFuture<Boolean> future = new CompletableFuture<>();
future.completeExceptionally(new DoNotRetryIOException(
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
return future;
}
}
@Override
public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
return batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> ((Result)r).getExists()))
.collect(toList());
}
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method.
private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
@ -594,8 +642,16 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action)
.forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize()));
for (Row action : actions) {
if (action instanceof Put) {
validatePut((Put) action, conn.connConf.getMaxKeyValueSize());
} else if (action instanceof CheckAndMutate) {
CheckAndMutate checkAndMutate = (CheckAndMutate) action;
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
}
}
}
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)

View File

@ -492,14 +492,22 @@ public interface Table extends Closeable {
* table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put);
* </code>
* </pre>
*
* @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
default CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
throw new NotImplementedException("Add an implementation!");
}
/**
* A helper class for sending checkAndMutate request.
*
* @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
interface CheckAndMutateBuilder {
/**
@ -562,14 +570,22 @@ public interface Table extends Closeable {
* table.checkAndMutate(row, filter).thenPut(put);
* </code>
* </pre>
*
* @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
throw new NotImplementedException("Add an implementation!");
}
/**
* A helper class for sending checkAndMutate request with a filter.
*
* @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it
* any more.
*/
@Deprecated
interface CheckAndMutateWithFilterBuilder {
/**
@ -596,6 +612,31 @@ public interface Table extends Closeable {
boolean thenMutate(RowMutations mutation) throws IOException;
}
/**
* checkAndMutate that atomically checks if a row matches the specified condition. If it does,
* it performs the specified action.
*
* @param checkAndMutate The CheckAndMutate object.
* @return boolean that represents the result for the CheckAndMutate.
* @throws IOException if a remote or network exception occurs.
*/
default boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
return checkAndMutate(Collections.singletonList(checkAndMutate))[0];
}
/**
* Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
* that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
* atomically (and thus, each may fail independently of others).
*
* @param checkAndMutates The list of CheckAndMutate.
* @return A array of boolean that represents the result for each CheckAndMutate.
* @throws IOException if a remote or network exception occurs.
*/
default boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) throws IOException {
throw new NotImplementedException("Add an implementation!");
}
/**
* Performs multiple mutations atomically on a single row. Currently
* {@link Put} and {@link Delete} are supported.

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@ -237,37 +238,20 @@ public final class RequestConverter {
}
/**
* Create a protocol buffer MutateRequest for a conditioned put
* Create a protocol buffer MutateRequest for a conditioned put/delete
*
* @return a mutate request
* @throws IOException
*/
public static MutateRequest buildMutateRequest(
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Put put) throws IOException {
return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
put, MutationType.PUT);
}
/**
* Create a protocol buffer MutateRequest for a conditioned delete
*
* @return a mutate request
* @throws IOException
*/
public static MutateRequest buildMutateRequest(
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Delete delete) throws IOException {
return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
delete, MutationType.DELETE);
}
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
final Filter filter, final TimeRange timeRange, final Mutation mutation,
final MutationType type) throws IOException {
final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else {
type = MutationType.DELETE;
}
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setMutation(ProtobufUtil.toMutation(type, mutation))
@ -306,9 +290,8 @@ public final class RequestConverter {
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition(
buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build();
}
/**
@ -426,42 +409,6 @@ public final class RequestConverter {
return builder;
}
/**
* Create a protocol buffer MultiRequest for row mutations that does not hold data. Data/Cells
* are carried outside of protobuf. Return references to the Cells in <code>cells</code> param.
* Does not propagate Action absolute position. Does not set atomic action on the created
* RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
* @param cells Return in here a list of Cells as CellIterable.
* @return a region mutation minus data
* @throws IOException
*/
public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
final RowMutations rowMutations, final List<CellScannable> cells,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder)
throws IOException {
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type = null;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else if (mutation instanceof Delete) {
type = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
return regionActionBuilder;
}
public static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
@ -627,8 +574,8 @@ public final class RequestConverter {
* @param actionBuilder actionBuilder to be used to build action.
* @param mutationBuilder mutationBuilder to be used to build mutation.
* @param nonceGroup nonceGroup to be applied.
* @param rowMutationsIndexMap Map of created RegionAction to the original index for a
* RowMutations within the original list of actions
* @param indexMap Map of created RegionAction to the original index for a
* RowMutations/CheckAndMutate within the original list of actions
* @throws IOException
*/
public static void buildRegionActions(final byte[] regionName,
@ -636,13 +583,14 @@ public final class RequestConverter {
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder,
long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
long nonceGroup, final Map<Integer, Integer> indexMap) throws IOException {
regionActionBuilder.clear();
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
regionActionBuilder, regionName);
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
boolean hasNonce = false;
List<Action> rowMutationsList = new ArrayList<>();
List<Action> checkAndMutates = new ArrayList<>();
for (Action action: actions) {
Row row = action.getAction();
@ -684,6 +632,8 @@ public final class RequestConverter {
.setRequest(value)));
} else if (row instanceof RowMutations) {
rowMutationsList.add(action);
} else if (row instanceof CheckAndMutate) {
checkAndMutates.add(action);
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
@ -699,17 +649,78 @@ public final class RequestConverter {
// on the one row. We do separate RegionAction for each RowMutations.
// We maintain a map to keep track of this RegionAction and the original Action index.
for (Action action : rowMutationsList) {
RowMutations rms = (RowMutations) action.getAction();
RegionAction.Builder rowMutationsRegionActionBuilder =
RequestConverter.buildRegionAction(regionName, rms);
rowMutationsRegionActionBuilder.setAtomic(true);
// Put it in the multiRequestBuilder
multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
builder.clear();
getRegionActionBuilderWithRegion(builder, regionName);
buildRegionAction((RowMutations) action.getAction(), builder, actionBuilder,
mutationBuilder);
builder.setAtomic(true);
multiRequestBuilder.addRegionAction(builder.build());
// This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
// in the overall multiRequest.
rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
indexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
action.getOriginalIndex());
}
// Process CheckAndMutate here. Similar to RowMutations, we do separate RegionAction for each
// CheckAndMutate and maintain a map to keep track of this RegionAction and the original
// Action index.
for (Action action : checkAndMutates) {
builder.clear();
getRegionActionBuilderWithRegion(builder, regionName);
CheckAndMutate cam = (CheckAndMutate) action.getAction();
builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(),
cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange()));
if (cam.getAction() instanceof Put) {
actionBuilder.clear();
mutationBuilder.clear();
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(MutationType.PUT,
(Put) cam.getAction(), mutationBuilder)));
} else if (cam.getAction() instanceof Delete) {
actionBuilder.clear();
mutationBuilder.clear();
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE,
(Delete) cam.getAction(), mutationBuilder)));
} else if (cam.getAction() instanceof RowMutations) {
buildRegionAction((RowMutations) cam.getAction(), builder, actionBuilder,
mutationBuilder);
builder.setAtomic(true);
} else {
throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
cam.getAction().getClass().getName());
}
multiRequestBuilder.addRegionAction(builder.build());
// This CheckAndMutate region action is at (multiRequestBuilder.getRegionActionCount() - 1)
// in the overall multiRequest.
indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
}
}
private static void buildRegionAction(final RowMutations rowMutations,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
}
/**
@ -729,8 +740,8 @@ public final class RequestConverter {
* @param actionBuilder actionBuilder to be used to build action.
* @param mutationBuilder mutationBuilder to be used to build mutation.
* @param nonceGroup nonceGroup to be applied.
* @param rowMutationsIndexMap Map of created RegionAction to the original index for a
* RowMutations within the original list of actions
* @param indexMap Map of created RegionAction to the original index for a
* RowMutations/CheckAndMutate within the original list of actions
* @throws IOException
*/
public static void buildNoDataRegionActions(final byte[] regionName,
@ -739,14 +750,14 @@ public final class RequestConverter {
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder,
long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
long nonceGroup, final Map<Integer, Integer> indexMap) throws IOException {
regionActionBuilder.clear();
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
regionActionBuilder, regionName);
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
RegionAction.Builder rowMutationsRegionActionBuilder = null;
boolean hasNonce = false;
List<Action> rowMutationsList = new ArrayList<>();
List<Action> checkAndMutates = new ArrayList<>();
for (Action action: actions) {
Row row = action.getAction();
@ -757,26 +768,9 @@ public final class RequestConverter {
Get g = (Get)row;
builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
} else if (row instanceof Put) {
Put p = (Put)row;
cells.add(p);
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, mutationBuilder)));
buildNoDataRegionAction((Put) row, cells, builder, actionBuilder, mutationBuilder);
} else if (row instanceof Delete) {
Delete d = (Delete)row;
int size = d.size();
// Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing
// in it but the row to delete. In this case, the current implementation does not make
// a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases
// where the size returned is zero, we will send the Delete fully pb'd rather than have
// metadata only in the pb and then send the kv along the side in cells.
if (size > 0) {
cells.add(d);
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, mutationBuilder)));
} else {
builder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, mutationBuilder)));
}
buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, mutationBuilder);
} else if (row instanceof Append) {
Append a = (Append)row;
cells.add(a);
@ -807,6 +801,8 @@ public final class RequestConverter {
.setRequest(value)));
} else if (row instanceof RowMutations) {
rowMutationsList.add(action);
} else if (row instanceof CheckAndMutate) {
checkAndMutates.add(action);
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
@ -822,23 +818,106 @@ public final class RequestConverter {
// on the one row. We do separate RegionAction for each RowMutations.
// We maintain a map to keep track of this RegionAction and the original Action index.
for (Action action : rowMutationsList) {
RowMutations rms = (RowMutations) action.getAction();
if (rowMutationsRegionActionBuilder == null) {
rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder();
} else {
rowMutationsRegionActionBuilder.clear();
}
rowMutationsRegionActionBuilder.setRegion(
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms,
cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder);
rowMutationsRegionActionBuilder.setAtomic(true);
// Put it in the multiRequestBuilder
multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
builder.clear();
getRegionActionBuilderWithRegion(builder, regionName);
buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder,
mutationBuilder);
builder.setAtomic(true);
multiRequestBuilder.addRegionAction(builder.build());
// This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
// in the overall multiRequest.
rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
action.getOriginalIndex());
indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
}
// Process CheckAndMutate here. Similar to RowMutations, we do separate RegionAction for each
// CheckAndMutate and maintain a map to keep track of this RegionAction and the original
// Action index.
for (Action action : checkAndMutates) {
builder.clear();
getRegionActionBuilderWithRegion(builder, regionName);
CheckAndMutate cam = (CheckAndMutate) action.getAction();
builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(),
cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange()));
if (cam.getAction() instanceof Put) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Put) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
} else if (cam.getAction() instanceof Delete) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
} else if (cam.getAction() instanceof RowMutations) {
buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
builder.setAtomic(true);
} else {
throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
cam.getAction().getClass().getName());
}
multiRequestBuilder.addRegionAction(builder.build());
// This CheckAndMutate region action is at (multiRequestBuilder.getRegionActionCount() - 1)
// in the overall multiRequest.
indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
}
}
private static void buildNoDataRegionAction(final Put put, final List<CellScannable> cells,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
cells.add(put);
regionActionBuilder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, put, mutationBuilder)));
}
private static void buildNoDataRegionAction(final Delete delete,
final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
int size = delete.size();
// Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing
// in it but the row to delete. In this case, the current implementation does not make
// a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases
// where the size returned is zero, we will send the Delete fully pb'd rather than have
// metadata only in the pb and then send the kv along the side in cells.
if (size > 0) {
cells.add(delete);
regionActionBuilder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, delete, mutationBuilder)));
} else {
regionActionBuilder.addAction(actionBuilder.
setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, mutationBuilder)));
}
}
private static void buildNoDataRegionAction(final RowMutations rowMutations,
final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else if (mutation instanceof Delete) {
type = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
}

View File

@ -105,14 +105,14 @@ public final class ResponseConverter {
* Get the results from a protocol buffer MultiResponse
*
* @param request the original protocol buffer MultiRequest
* @param rowMutationsIndexMap Used to support RowMutations in batch
* @param indexMap Used to support RowMutations/CheckAndMutate in batch
* @param response the protocol buffer MultiResponse to convert
* @param cells Cells to go with the passed in <code>proto</code>. Can be null.
* @return the results that were in the MultiResponse (a Result or an Exception).
* @throws IOException
*/
public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request,
final Map<Integer, Integer> rowMutationsIndexMap, final MultiResponse response,
final Map<Integer, Integer> indexMap, final MultiResponse response,
final CellScanner cells) throws IOException {
int requestRegionActionCount = request.getRegionActionCount();
int responseRegionActionResultCount = response.getRegionActionResultCount();
@ -149,18 +149,17 @@ public final class ResponseConverter {
Object responseValue;
// For RowMutations action, if there is an exception, the exception is set
// For RowMutations/CheckAndMutate action, if there is an exception, the exception is set
// at the RegionActionResult level and the ResultOrException is null at the original index
Integer rowMutationsIndex =
(rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i));
if (rowMutationsIndex != null) {
// This RegionAction is from a RowMutations in a batch.
Integer index = (indexMap == null ? null : indexMap.get(i));
if (index != null) {
// This RegionAction is from a RowMutations/CheckAndMutate in a batch.
// If there is an exception from the server, the exception is set at
// the RegionActionResult level, which has been handled above.
responseValue = response.getProcessed() ?
responseValue = actionResult.getProcessed() ?
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
results.add(regionName, rowMutationsIndex, responseValue);
results.add(regionName, index, responseValue);
continue;
}
@ -171,11 +170,11 @@ public final class ResponseConverter {
responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
} else if (roe.hasServiceResult()) {
responseValue = roe.getServiceResult();
} else{
} else {
// Sometimes, the response is just "it was processed". Generally, this occurs for things
// like mutateRows where either we get back 'processed' (or not) and optionally some
// statistics about the regions we touched.
responseValue = response.getProcessed() ?
responseValue = actionResult.getProcessed() ?
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
}

View File

@ -455,6 +455,7 @@ message RegionAction {
// When set, run mutations as atomic unit.
optional bool atomic = 2;
repeated Action action = 3;
optional Condition condition = 4;
}
/*
@ -499,6 +500,7 @@ message RegionActionResult {
repeated ResultOrException resultOrException = 1;
// If the operation failed globally for this region, this exception is set
optional NameBytesPair exception = 2;
optional bool processed = 3;
}
/**
@ -511,13 +513,16 @@ message RegionActionResult {
message MultiRequest {
repeated RegionAction regionAction = 1;
optional uint64 nonceGroup = 2;
optional Condition condition = 3;
// Moved this to RegionAction in HBASE-8458. Keep it for backward compatibility. Need to remove
// it in the future.
optional Condition condition = 3 [deprecated=true];
}
message MultiResponse {
repeated RegionActionResult regionActionResult = 1;
// used for mutate to indicate processed only
optional bool processed = 2;
// Moved this to RegionActionResult in HBASE-8458. Keep it for backward compatibility. Need to
// remove it in the future.
optional bool processed = 2 [deprecated=true];
optional MultiRegionLoadStats regionStatistics = 3;
}

View File

@ -454,6 +454,7 @@ message RegionAction {
// When set, run mutations as atomic unit.
optional bool atomic = 2;
repeated Action action = 3;
optional Condition condition = 4;
}
/*
@ -498,6 +499,7 @@ message RegionActionResult {
repeated ResultOrException resultOrException = 1;
// If the operation failed globally for this region, this exception is set
optional NameBytesPair exception = 2;
optional bool processed = 3;
}
/**
@ -510,13 +512,16 @@ message RegionActionResult {
message MultiRequest {
repeated RegionAction regionAction = 1;
optional uint64 nonceGroup = 2;
optional Condition condition = 3;
// Moved this to RegionAction in HBASE-8458. Keep it for backward compatibility. Need to remove
// it in the future.
optional Condition condition = 3 [deprecated=true];
}
message MultiResponse {
repeated RegionActionResult regionActionResult = 1;
// used for mutate to indicate processed only
optional bool processed = 2;
// Moved this to RegionActionResult in HBASE-8458. Keep it for backward compatibility. Need to
// remove it in the future.
optional bool processed = 2 [deprecated=true];
optional MultiRegionLoadStats regionStatistics = 3;
}

View File

@ -52,6 +52,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@ -811,6 +812,16 @@ public class RemoteHTable implements Table {
throw new UnsupportedOperationException("checkAndMutate not implemented");
}
@Override
public boolean checkAndMutate(CheckAndMutate checkAndMutate) {
throw new NotImplementedException("Implement later");
}
@Override
public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) {
throw new NotImplementedException("Implement later");
}
@Override
public Result increment(Increment increment) throws IOException {
throw new IOException("Increment not supported");

View File

@ -2730,6 +2730,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
private void failRegionAction(MultiResponse.Builder responseBuilder,
RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
CellScanner cellScanner, Throwable error) {
rpcServer.getMetrics().exception(error);
regionActionResultBuilder.setException(ResponseConverter.buildException(error));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
// All Mutations in this RegionAction not executed as we can not see the Region online here
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
// corresponding to these Mutations.
if (cellScanner != null) {
skipCellsForMutations(regionAction.getActionList(), cellScanner);
}
}
/**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor
*
@ -2758,43 +2772,103 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
// this will contain all the cells that we need to return. It's created later, if needed.
List<CellScannable> cellsToReturn = null;
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
Boolean processed = null;
RegionScannersCloseCallBack closeCallBack = null;
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
this.rpcMultiRequestCount.increment();
this.requestCount.increment();
Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
.getRegionActionCount());
ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
for (RegionAction regionAction : request.getRegionActionList()) {
// We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
// following logic is for backward compatibility as old clients still use
// MultiRequest#condition in case of checkAndMutate with RowMutations.
if (request.hasCondition()) {
if (request.getRegionActionList().isEmpty()) {
// If the region action list is empty, do nothing.
responseBuilder.setProcessed(true);
return responseBuilder.build();
}
RegionAction regionAction = request.getRegionAction(0);
// When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
// we can assume regionAction.getAtomic() is true here.
assert regionAction.getAtomic();
OperationQuota quota;
HRegion region;
regionActionResultBuilder.clear();
RegionSpecifier regionSpecifier = regionAction.getRegion();
try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
return responseBuilder.build();
}
try {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier =
condition.hasQualifier() ? condition.getQualifier().toByteArray() : null;
CompareOperator op = condition.hasCompareType() ?
CompareOperator.valueOf(condition.getCompareType().name()) :
null;
ByteArrayComparable comparator = condition.hasComparator() ?
ProtobufUtil.toComparator(condition.getComparator()) : null;
Filter filter =
condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
boolean processed =
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
spaceQuotaEnforcement);
responseBuilder.setProcessed(processed);
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
// As it's an atomic operation with a condition, we may expect it's a global failure.
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
} finally {
quota.close();
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
// All Mutations in this RegionAction not executed as we can not see the Region online here
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
// corresponding to these Mutations.
skipCellsForMutations(regionAction.getActionList(), cellScanner);
ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
if (regionLoadStats != null) {
responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
.addRegion(regionSpecifier).addStat(regionLoadStats).build());
}
return responseBuilder.build();
}
// this will contain all the cells that we need to return. It's created later, if needed.
List<CellScannable> cellsToReturn = null;
RegionScannersCloseCallBack closeCallBack = null;
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
.getRegionActionCount());
for (RegionAction regionAction : request.getRegionActionList()) {
OperationQuota quota;
HRegion region;
RegionSpecifier regionSpecifier = regionAction.getRegion();
regionActionResultBuilder.clear();
try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
continue; // For this region it's a failure.
}
if (regionAction.hasAtomic() && regionAction.getAtomic()) {
// How does this call happen? It may need some work to play well w/ the surroundings.
// Need to return an item per Action along w/ Action index. TODO.
try {
if (request.hasCondition()) {
Condition condition = request.getCondition();
if (regionAction.hasCondition()) {
try {
Condition condition = regionAction.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier = condition.hasQualifier() ?
@ -2806,17 +2880,81 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Filter filter = condition.hasFilter() ?
ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
boolean processed;
if (regionAction.hasAtomic() && regionAction.getAtomic()) {
// RowMutations
processed =
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
spaceQuotaEnforcement);
} else {
if (regionAction.getActionList().isEmpty()) {
// If the region action list is empty, do nothing.
regionActionResultBuilder.setProcessed(true);
continue;
}
Action action = regionAction.getAction(0);
if (action.hasGet()) {
throw new DoNotRetryIOException("CheckAndMutate doesn't support GET="
+ action.getGet());
}
MutationProto mutation = action.getMutation();
switch (mutation.getMutateType()) {
case PUT:
Put put = ProtobufUtil.toPut(mutation, cellScanner);
checkCellSizeLimit(region, put);
// Throws an exception when violated
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
quota.addMutation(put);
if (filter != null) {
processed = region.checkAndMutate(row, filter, timeRange, put);
} else {
processed = region.checkAndMutate(row, family, qualifier, op, comparator,
timeRange, put);
}
break;
case DELETE:
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
checkCellSizeLimit(region, delete);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
quota.addMutation(delete);
if (filter != null) {
processed = region.checkAndMutate(row, filter, timeRange, delete);
} else {
processed = region.checkAndMutate(row, family, qualifier, op, comparator,
timeRange, delete);
}
break;
default:
throw new DoNotRetryIOException("CheckAndMutate doesn't support "
+ mutation.getMutateType());
}
// To unify the response format with doNonAtomicRegionMutation and read through
// client's AsyncProcess we have to add an empty result instance per operation
regionActionResultBuilder.addResultOrException(
ClientProtos.ResultOrException.newBuilder().setIndex(0).build());
}
regionActionResultBuilder.setProcessed(processed);
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
// As it's an atomic operation with a condition, we may expect it's a global failure.
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
}
} else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
try {
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
cellScanner, spaceQuotaEnforcement);
processed = Boolean.TRUE;
}
regionActionResultBuilder.setProcessed(true);
// We no longer use MultiResponse#processed. Instead, we use
// RegionActionResult#condition. This is for backward compatibility for old clients.
responseBuilder.setProcessed(true);
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
// As it's atomic, we may expect it's a global failure.
@ -2835,8 +2973,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
spaceQuotaEnforcement);
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
} finally {
quota.close();
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
if(regionLoadStats != null) {
regionStats.put(regionSpecifier, regionLoadStats);
@ -2847,10 +2988,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
}
if (processed != null) {
responseBuilder.setProcessed(processed);
}
MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){
builder.addRegion(stat.getKey());

View File

@ -246,9 +246,30 @@ public class TestAsyncTable {
assertArrayEquals(IntStream.range(0, count).toArray(), actual);
}
@Test
public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
AsyncTable<?> table = getTable.get();
RowMutations mutation = new RowMutations(row);
mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
table.mutateRow(mutation).get();
Result result = table.get(new Get(row)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
mutation = new RowMutations(row);
mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
table.mutateRow(mutation).get();
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
}
// Tests for old checkAndMutate API
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndPut() throws InterruptedException, ExecutionException {
@Deprecated
public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
@ -271,7 +292,8 @@ public class TestAsyncTable {
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndDelete() throws InterruptedException, ExecutionException {
@Deprecated
public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
@ -307,27 +329,10 @@ public class TestAsyncTable {
});
}
@Test
public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
AsyncTable<?> table = getTable.get();
RowMutations mutation = new RowMutations(row);
mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
table.mutateRow(mutation).get();
Result result = table.get(new Get(row)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
mutation = new RowMutations(row);
mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
table.mutateRow(mutation).get();
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndMutate() throws InterruptedException, ExecutionException {
@Deprecated
public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
@ -371,7 +376,8 @@ public class TestAsyncTable {
}
@Test
public void testCheckAndMutateWithTimeRange() throws Exception {
@Deprecated
public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
AsyncTable<?> table = getTable.get();
final long ts = System.currentTimeMillis() / 2;
Put put = new Put(row);
@ -390,6 +396,7 @@ public class TestAsyncTable {
assertTrue(ok);
RowMutations rm = new RowMutations(row).add((Mutation) put);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenMutate(rm).get();
assertFalse(ok);
@ -410,7 +417,8 @@ public class TestAsyncTable {
}
@Test
public void testCheckAndMutateWithSingleFilter() throws Throwable {
@Deprecated
public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
@ -465,7 +473,8 @@ public class TestAsyncTable {
}
@Test
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
@Deprecated
public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
@ -536,7 +545,8 @@ public class TestAsyncTable {
}
@Test
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
@Deprecated
public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
@ -569,7 +579,8 @@ public class TestAsyncTable {
}
@Test
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
@Deprecated
public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
@ -599,11 +610,678 @@ public class TestAsyncTable {
}
@Test(expected = NullPointerException.class)
public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
@Deprecated
public void testCheckAndMutateWithoutConditionForOldApi() {
getTable.get().checkAndMutate(row, FAMILY)
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
// Tests for new CheckAndMutate API
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndPut() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
int count = 10;
CountDownLatch latch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))))
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
latch.countDown();
}));
latch.await();
assertEquals(1, successCount.get());
String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndDelete() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.build(
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
deleteLatch.countDown();
}));
deleteLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
IntStream.range(0, count).forEach(i -> {
if (i == successIndex.get()) {
assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
} else {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
}
});
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndMutate() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch mutateLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> {
RowMutations mutation = new RowMutations(row);
try {
mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
mutation
.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.build(mutation))
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
mutateLatch.countDown();
});
});
mutateLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
IntStream.range(0, count).forEach(i -> {
if (i == successIndex.get()) {
assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
} else {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
}
});
}
@Test
public void testCheckAndMutateWithTimeRange() throws Exception {
AsyncTable<?> table = getTable.get();
final long ts = System.currentTimeMillis() / 2;
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(put)).get();
assertTrue(ok);
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts + 10000))
.build(put)).get();
assertFalse(ok);
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(put)).get();
assertTrue(ok);
RowMutations rm = new RowMutations(row).add((Mutation) put);
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts + 10000))
.build(rm)).get();
assertFalse(ok);
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(rm)).get();
assertTrue(ok);
Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts + 10000))
.build(delete)).get();
assertFalse(ok);
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(delete)).get();
assertTrue(ok);
}
@Test
public void testCheckAndMutateWithSingleFilter() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put).get();
// Put with success
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
// Delete with success
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
assertTrue(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
// Mutate with success
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
assertTrue(ok);
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
}
@Test
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put).get();
// Put with success
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
// Delete with success
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
assertTrue(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
// Mutate with success
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
assertTrue(ok);
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
}
@Test
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
// Put with success
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(100L))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(101L))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@Test
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
.get();
// Put with success
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 101))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 100))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
// Tests for batch version of checkAndMutate
@Test
public void testCheckAndMutateBatch() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
// Test for Put
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
List<Boolean> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for Delete
checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
.build(new Delete(row));
checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
.build(new Delete(row2));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for RowMutations
checkAndMutate1 = CheckAndMutate.newBuilder(row3)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.build(new RowMutations(row3)
.add((Mutation) new Put(row3)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
.add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))));
checkAndMutate2 = CheckAndMutate.newBuilder(row4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
.build(new RowMutations(row4)
.add((Mutation) new Put(row4)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
.add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D"))));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
result = table.get(new Get(row3)).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
result = table.get(new Get(row4)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testCheckAndMutateBatch2() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))).get();
// Test for ifNotExists()
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, Bytes.toBytes("B"))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifNotExists(FAMILY, Bytes.toBytes("B"))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
List<Boolean> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for ifMatches()
checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for timeRange()
checkAndMutate1 = CheckAndMutate.newBuilder(row3)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.timeRange(TimeRange.between(0, 101))
.build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
checkAndMutate2 = CheckAndMutate.newBuilder(row4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
.timeRange(TimeRange.between(0, 100))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testCheckAndMutateBatchWithFilter() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row)
.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row2)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
.addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
// Test for Put
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
List<Boolean> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
// Test for Delete
checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
// Test for RowMutations
checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(row2)
.add((Mutation) new Put(row2)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
.add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D"))));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(row2)).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
@Test
public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
.addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.timeRange(TimeRange.between(0, 101))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("d")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("e"))))
.timeRange(TimeRange.between(0, 100))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
List<Boolean> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0));
assertFalse(results.get(1));
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
@Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();

View File

@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -333,4 +334,57 @@ public class TestAsyncTableBatch {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
@Test
public void testWithCheckAndMutate() throws Exception {
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
byte[] row1 = Bytes.toBytes("row1");
byte[] row2 = Bytes.toBytes("row2");
byte[] row3 = Bytes.toBytes("row3");
byte[] row4 = Bytes.toBytes("row4");
byte[] row5 = Bytes.toBytes("row5");
table.putAll(Arrays.asList(
new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g")));
Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
RowMutations mutations = new RowMutations(row3)
.add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
.add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
List<Object> results = table.batchAll(actions).get();
assertTrue(((Result) results.get(0)).getExists());
assertEquals("b",
Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B"))));
assertTrue(((Result) results.get(2)).getExists());
assertFalse(((Result) results.get(3)).getExists());
assertTrue(((Result) results.get(4)).isEmpty());
Result result = table.get(new Get(row1)).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row3)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
result = table.get(new Get(row4)).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
result = table.get(new Get(row5)).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
}
}

View File

@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -55,6 +57,9 @@ public class TestCheckAndMutate {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] ROWKEY = Bytes.toBytes("12345");
private static final byte[] ROWKEY2 = Bytes.toBytes("67890");
private static final byte[] ROWKEY3 = Bytes.toBytes("abcde");
private static final byte[] ROWKEY4 = Bytes.toBytes("fghij");
private static final byte[] FAMILY = Bytes.toBytes("cf");
@Rule
@ -129,8 +134,11 @@ public class TestCheckAndMutate {
return rm;
}
// Tests for old checkAndMutate API
@Test
public void testCheckAndMutate() throws Throwable {
@Deprecated
public void testCheckAndMutateForOldApi() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
@ -163,40 +171,8 @@ public class TestCheckAndMutate {
}
@Test
public void testCheckAndMutateWithBuilder() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
// get row back and assert the values
getOneRowAndAssertAllExist(table);
// put the same row again with C column deleted
RowMutations rm = makeRowMutationsWithColumnCDeleted();
boolean res = table.checkAndMutate(ROWKEY, FAMILY).qualifier(Bytes.toBytes("A"))
.ifEquals(Bytes.toBytes("a")).thenMutate(rm);
assertTrue(res);
// get row back and assert the values
getOneRowAndAssertAllButCExist(table);
//Test that we get a region level exception
try {
rm = getBogusRowMutations();
table.checkAndMutate(ROWKEY, FAMILY).qualifier(Bytes.toBytes("A"))
.ifEquals(Bytes.toBytes("a")).thenMutate(rm);
fail("Expected NoSuchColumnFamilyException");
} catch (RetriesExhaustedWithDetailsException e) {
try {
throw e.getCause(0);
} catch (NoSuchColumnFamilyException e1) {
// expected
}
}
}
}
@Test
public void testCheckAndMutateWithSingleFilter() throws Throwable {
@Deprecated
public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
@ -245,7 +221,8 @@ public class TestCheckAndMutate {
}
@Test
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
@Deprecated
public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
@ -310,7 +287,8 @@ public class TestCheckAndMutate {
}
@Test
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
@Deprecated
public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
try (Table table = createTable()) {
// Put with specifying the timestamp
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
@ -341,7 +319,8 @@ public class TestCheckAndMutate {
}
@Test
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
@Deprecated
public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
try (Table table = createTable()) {
// Put with specifying the timestamp
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
@ -368,10 +347,533 @@ public class TestCheckAndMutate {
}
@Test(expected = NullPointerException.class)
public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
@Deprecated
public void testCheckAndMutateWithoutConditionForOldApi() throws Throwable {
try (Table table = createTable()) {
table.checkAndMutate(ROWKEY, FAMILY)
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
}
// Tests for new CheckAndMutate API
@Test
public void testCheckAndMutate() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
// get row back and assert the values
getOneRowAndAssertAllExist(table);
// put the same row again with C column deleted
RowMutations rm = makeRowMutationsWithColumnCDeleted();
boolean res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(rm));
assertTrue(res);
// get row back and assert the values
getOneRowAndAssertAllButCExist(table);
// Test that we get a region level exception
try {
rm = getBogusRowMutations();
table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(rm));
fail("Expected NoSuchColumnFamilyException");
} catch (RetriesExhaustedWithDetailsException e) {
try {
throw e.getCause(0);
} catch (NoSuchColumnFamilyException e1) {
// expected
}
}
}
}
@Test
public void testCheckAndMutateWithSingleFilter() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
// get row back and assert the values
getOneRowAndAssertAllExist(table);
// Put with success
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new SingleColumnValueFilter(FAMILY,
Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
assertTrue(ok);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
assertFalse(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
// Delete with success
ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
assertTrue(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
// Mutate with success
ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new RowMutations(ROWKEY)
.add((Mutation) new Put(ROWKEY)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
assertTrue(ok);
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
}
}
@Test
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
// get row back and assert the values
getOneRowAndAssertAllExist(table);
// Put with success
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
assertTrue(ok);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
assertFalse(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
// Delete with success
ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
assertTrue(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
// Mutate with success
ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(ROWKEY)
.add((Mutation) new Put(ROWKEY)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
assertTrue(ok);
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
}
}
@Test
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
try (Table table = createTable()) {
// Put with specifying the timestamp
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
// Put with success
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(100L))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertTrue(ok);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(101L))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
assertFalse(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
}
}
@Test
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
try (Table table = createTable()) {
// Put with specifying the timestamp
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
// Put with success
boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new SingleColumnValueFilter(FAMILY,
Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 101))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertTrue(ok);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 100))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
assertFalse(ok);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
}
}
@Test(expected = IllegalStateException.class)
public void testCheckAndMutateBuilderWithoutCondition() {
CheckAndMutate.newBuilder(ROWKEY)
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
// Tests for batch version of checkAndMutate
@Test
public void testCheckAndMutateBatch() throws Throwable {
try (Table table = createTable()) {
table.put(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
// Test for Put
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
.build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for Delete
checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
.build(new Delete(ROWKEY));
checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
.build(new Delete(ROWKEY2));
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for RowMutations
checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY3)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.build(new RowMutations(ROWKEY3)
.add((Mutation) new Put(ROWKEY3)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
.add((Mutation) new Delete(ROWKEY3).addColumns(FAMILY, Bytes.toBytes("C"))));
checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
.build(new RowMutations(ROWKEY4)
.add((Mutation) new Put(ROWKEY4)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
.add((Mutation) new Delete(ROWKEY4).addColumns(FAMILY, Bytes.toBytes("D"))));
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
result = table.get(new Get(ROWKEY3));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
result = table.get(new Get(ROWKEY4));
assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
}
@Test
public void testCheckAndMutateBatch2() throws Throwable {
try (Table table = createTable()) {
table.put(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))));
// Test for ifNotExists()
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifNotExists(FAMILY, Bytes.toBytes("B"))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifNotExists(FAMILY, Bytes.toBytes("B"))
.build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for ifMatches()
checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
.build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for timeRange()
checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY3)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.timeRange(TimeRange.between(0, 101))
.build(new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
.timeRange(TimeRange.between(0, 100))
.build(new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
result = table.get(new Get(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C")));
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
}
@Test
public void testCheckAndMutateBatchWithFilter() throws Throwable {
try (Table table = createTable()) {
table.put(Arrays.asList(
new Put(ROWKEY)
.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(ROWKEY2)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
.addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))));
// Test for Put
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")));
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
// Test for Delete
checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("C")));
checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
// Test for RowMutations
checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(ROWKEY)
.add((Mutation) new Put(ROWKEY)
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
.add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(ROWKEY2)
.add((Mutation) new Put(ROWKEY2)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
.add((Mutation) new Delete(ROWKEY2).addColumns(FAMILY, Bytes.toBytes("D"))));
results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
result = table.get(new Get(ROWKEY));
assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(ROWKEY2));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
}
@Test
public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
try (Table table = createTable()) {
table.put(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
.addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))));
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.timeRange(TimeRange.between(0, 101))
.build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("d")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("e"))))
.timeRange(TimeRange.between(0, 100))
.build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results[0]);
assertFalse(results[1]);
Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")));
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
}
}

View File

@ -484,6 +484,60 @@ public class TestFromClientSide3 {
}
}
@Test
public void testBatchWithCheckAndMutate() throws Exception {
try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
byte[] row1 = Bytes.toBytes("row1");
byte[] row2 = Bytes.toBytes("row2");
byte[] row3 = Bytes.toBytes("row3");
byte[] row4 = Bytes.toBytes("row4");
byte[] row5 = Bytes.toBytes("row5");
table.put(Arrays.asList(
new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g")));
Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
RowMutations mutations = new RowMutations(row3)
.add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
.add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
Object[] results = new Object[actions.size()];
table.batch(actions, results);
assertTrue(((Result) results[0]).getExists());
assertEquals("b",
Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
assertTrue(((Result) results[2]).getExists());
assertFalse(((Result) results[3]).getExists());
assertTrue(((Result) results[4]).isEmpty());
Result result = table.get(new Get(row1));
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row3));
assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
result = table.get(new Get(row4));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
result = table.get(new Get(row5));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
}
}
@Test
public void testHTableExistsMethodSingleRegionSingleGet()
throws IOException, InterruptedException {

View File

@ -256,9 +256,8 @@ public class TestMalformedCellFromClient {
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
ClientProtos.MultiRequest request =
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
.setCondition(condition).build();
ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder()
.addRegionAction(builder.setCondition(condition).build()).build();
return request;
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@ -434,6 +435,16 @@ public class ThriftTable implements Table {
throw new NotImplementedException("Implement later");
}
@Override
public boolean checkAndMutate(CheckAndMutate checkAndMutate) {
throw new NotImplementedException("Implement later");
}
@Override
public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) {
throw new NotImplementedException("Implement later");
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);