diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 464eff54fcb..7e05b05c815 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -256,7 +256,7 @@ class AsyncBatchRpcRetryingCaller { } private ClientProtos.MultiRequest buildReq(Map actionsByRegion, - List cells, Map rowMutationsIndexMap) throws IOException { + List cells, Map indexMap) throws IOException { ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); @@ -264,14 +264,14 @@ class AsyncBatchRpcRetryingCaller { for (Map.Entry entry : actionsByRegion.entrySet()) { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); // multiRequestBuilder will be populated with region actions. - // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the + // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the // action list. RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions.stream() .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex())) .collect(Collectors.toList()), - cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, - rowMutationsIndexMap); + cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, + nonceGroup, indexMap); } return multiRequestBuilder.build(); } @@ -367,10 +367,10 @@ class AsyncBatchRpcRetryingCaller { List cells = new ArrayList<>(); // Map from a created RegionAction to the original index for a RowMutations within // the original list of actions. This will be used to process the results when there - // is RowMutations in the action list. - Map rowMutationsIndexMap = new HashMap<>(); + // is RowMutations/CheckAndMutate in the action list. + Map indexMap = new HashMap<>(); try { - req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); + req = buildReq(serverReq.actionsByRegion, cells, indexMap); } catch (IOException e) { onError(serverReq.actionsByRegion, tries, e, serverName); return; @@ -387,7 +387,7 @@ class AsyncBatchRpcRetryingCaller { } else { try { onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req, - rowMutationsIndexMap, resp, controller.cellScanner())); + indexMap, resp, controller.cellScanner())); } catch (Exception e) { onError(serverReq.actionsByRegion, tries, e, serverName); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index e10f1f82073..fd10603fa8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -231,12 +231,20 @@ public interface AsyncTable { * }); * * + * + * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); /** * A helper class for sending checkAndMutate request. + * + * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated interface CheckAndMutateBuilder { /** @@ -309,12 +317,20 @@ public interface AsyncTable { * }); * * + * + * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); /** * A helper class for sending checkAndMutate request with a filter. + * + * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated interface CheckAndMutateWithFilterBuilder { /** @@ -344,6 +360,37 @@ public interface AsyncTable { CompletableFuture thenMutate(RowMutations mutation); } + /** + * checkAndMutate that atomically checks if a row matches the specified condition. If it does, + * it performs the specified action. + * + * @param checkAndMutate The CheckAndMutate object. + * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. + */ + CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate); + + /** + * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense + * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed + * atomically (and thus, each may fail independently of others). + * + * @param checkAndMutates The list of CheckAndMutate. + * @return A list of {@link CompletableFuture}s that represent the result for each + * CheckAndMutate. + */ + List> checkAndMutate(List checkAndMutates); + + /** + * A simple version of batch checkAndMutate. It will fail if there are any failures. + * + * @param checkAndMutates The list of rows to apply. + * @return A {@link CompletableFuture} that wrapper the result boolean list. + */ + default CompletableFuture> checkAndMutateAll( + List checkAndMutates) { + return allOf(checkAndMutate(checkAndMutates)); + } + /** * Performs multiple mutations atomically on a single row. Currently {@link Put} and * {@link Delete} are supported. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index d6406b6430d..836c9b52e11 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -204,6 +204,17 @@ class AsyncTableImpl implements AsyncTable { }; } + @Override + public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + return wrap(rawTable.checkAndMutate(checkAndMutate)); + } + + @Override + public List> checkAndMutate(List checkAndMutates) { + return rawTable.checkAndMutate(checkAndMutates).stream() + .map(this::wrap).collect(toList()); + } + @Override public CompletableFuture mutateRow(RowMutations mutation) { return wrap(rawTable.mutateRow(mutation)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java new file mode 100644 index 00000000000..5fce3e2eacb --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +/** + * Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete} + * and {@link RowMutations} are supported. + *

+ * Use the builder class to instantiate a CheckAndMutate object. + * This builder class is fluent style APIs, the code are like: + *

+ * 
+ * // A CheckAndMutate operation where do the specified action if the column (specified by the
+ * // family and the qualifier) of the row equals to the specified value
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ *   .ifEquals(family, qualifier, value)
+ *   .build(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the column (specified by the
+ * // family and the qualifier) of the row doesn't exist
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ *   .ifNotExists(family, qualifier)
+ *   .build(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the row matches the filter
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ *   .ifMatches(filter)
+ *   .build(delete);
+ * 
+ * 
+ */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class CheckAndMutate extends Mutation { + + /** + * A builder class for building a CheckAndMutate object. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public static final class Builder { + private final byte[] row; + private byte[] family; + private byte[] qualifier; + private CompareOperator op; + private byte[] value; + private Filter filter; + private TimeRange timeRange; + + private Builder(byte[] row) { + this.row = Preconditions.checkNotNull(row, "row is null"); + } + + /** + * Check for lack of column + * + * @param family family to check + * @param qualifier qualifier to check + * @return the CheckAndMutate object + */ + public Builder ifNotExists(byte[] family, byte[] qualifier) { + return ifEquals(family, qualifier, null); + } + + /** + * Check for equality + * + * @param family family to check + * @param qualifier qualifier to check + * @param value the expected value + * @return the CheckAndMutate object + */ + public Builder ifEquals(byte[] family, byte[] qualifier, byte[] value) { + return ifMatches(family, qualifier, CompareOperator.EQUAL, value); + } + + /** + * @param family family to check + * @param qualifier qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @return the CheckAndMutate object + */ + public Builder ifMatches(byte[] family, byte[] qualifier, CompareOperator compareOp, + byte[] value) { + this.family = Preconditions.checkNotNull(family, "family is null"); + this.qualifier = qualifier; + this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); + this.value = value; + return this; + } + + /** + * @param filter filter to check + * @return the CheckAndMutate object + */ + public Builder ifMatches(Filter filter) { + this.filter = Preconditions.checkNotNull(filter, "filter is null"); + return this; + } + + /** + * @param timeRange time range to check + * @return the CheckAndMutate object + */ + public Builder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + + private void preCheck(Row action) { + Preconditions.checkNotNull(action, "action (Put/Delete/RowMutations) is null"); + if (!Bytes.equals(row, action.getRow())) { + throw new IllegalArgumentException("The row of the action (Put/Delete/RowMutations) <" + + Bytes.toStringBinary(action.getRow()) + "> doesn't match the original one <" + + Bytes.toStringBinary(this.row) + ">"); + } + Preconditions.checkState(op != null || filter != null, "condition is null. You need to" + + " specify the condition by calling ifNotExists/ifEquals/ifMatches before building a" + + " CheckAndMutate object"); + } + + /** + * @param put data to put if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(Put put) { + preCheck(put); + if (filter != null) { + return new CheckAndMutate(row, filter, timeRange, put); + } else { + return new CheckAndMutate(row, family, qualifier, op, value, timeRange, put); + } + } + + /** + * @param delete data to delete if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(Delete delete) { + preCheck(delete); + if (filter != null) { + return new CheckAndMutate(row, filter, timeRange, delete); + } else { + return new CheckAndMutate(row, family, qualifier, op, value, timeRange, delete); + } + } + + /** + * @param mutation mutations to perform if check succeeds + * @return a CheckAndMutate object + */ + public CheckAndMutate build(RowMutations mutation) { + preCheck(mutation); + if (filter != null) { + return new CheckAndMutate(row, filter, timeRange, mutation); + } else { + return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation); + } + } + } + + /** + * returns a builder object to build a CheckAndMutate object + * + * @param row row + * @return a builder object + */ + public static Builder newBuilder(byte[] row) { + return new Builder(row); + } + + private final byte[] family; + private final byte[] qualifier; + private final CompareOperator op; + private final byte[] value; + private final Filter filter; + private final TimeRange timeRange; + private final Row action; + + private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op, + byte[] value, TimeRange timeRange, Row action) { + super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap()); + this.family = family; + this.qualifier = qualifier; + this.op = op; + this.value = value; + this.filter = null; + this.timeRange = timeRange; + this.action = action; + } + + private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) { + super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap()); + this.family = null; + this.qualifier = null; + this.op = null; + this.value = null; + this.filter = filter; + this.timeRange = timeRange; + this.action = action; + } + + /** + * @return the family to check + */ + public byte[] getFamily() { + return family; + } + + /** + * @return the qualifier to check + */ + public byte[] getQualifier() { + return qualifier; + } + + /** + * @return the comparison operator + */ + public CompareOperator getCompareOp() { + return op; + } + + /** + * @return the expected value + */ + public byte[] getValue() { + return value; + } + + /** + * @return the filter to check + */ + public Filter getFilter() { + return filter; + } + + /** + * @return the time range to check + */ + public TimeRange getTimeRange() { + return timeRange; + } + + /** + * @return the action done if check succeeds + */ + public Row getAction() { + return action; + } + + @Override + public NavigableMap> getFamilyCellMap() { + if (action instanceof Mutation) { + return ((Mutation) action).getFamilyCellMap(); + } + throw new UnsupportedOperationException(); + } + + @Override + public long getTimestamp() { + if (action instanceof Mutation) { + return ((Mutation) action).getTimestamp(); + } + throw new UnsupportedOperationException(); + } + + @Override + public Mutation setTimestamp(long timestamp) { + if (action instanceof Mutation) { + return ((Mutation) action).setTimestamp(timestamp); + } + throw new UnsupportedOperationException(); + } + + @Override + public Durability getDurability() { + if (action instanceof Mutation) { + return ((Mutation) action).getDurability(); + } + throw new UnsupportedOperationException(); + } + + @Override + public Mutation setDurability(Durability d) { + if (action instanceof Mutation) { + return ((Mutation) action).setDurability(d); + } + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getAttribute(String name) { + if (action instanceof Mutation) { + return ((Mutation) action).getAttribute(name); + } + throw new UnsupportedOperationException(); + } + + @Override + public OperationWithAttributes setAttribute(String name, byte[] value) { + if (action instanceof Mutation) { + return ((Mutation) action).setAttribute(name, value); + } + throw new UnsupportedOperationException(); + } + + @Override + public int getPriority() { + if (action instanceof Mutation) { + return ((Mutation) action).getPriority(); + } + return ((RowMutations) action).getMaxPriority(); + } + + @Override + public OperationWithAttributes setPriority(int priority) { + if (action instanceof Mutation) { + return ((Mutation) action).setPriority(priority); + } + throw new UnsupportedOperationException(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 9b30de60baf..5f02260036d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -772,11 +772,13 @@ public class HTable implements Table { } @Override + @Deprecated public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { return new CheckAndMutateBuilderImpl(row, family); } @Override + @Deprecated public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { return new CheckAndMutateWithFilterBuilderImpl(row, filter); } @@ -849,6 +851,52 @@ public class HTable implements Table { return doCheckAndMutate(row, family, qualifier, op, value, null, null, rm); } + @Override + public boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + Row action = checkAndMutate.getAction(); + if (action instanceof Put) { + Put put = (Put) action; + validatePut(put); + return doCheckAndPut(checkAndMutate.getRow(), checkAndMutate.getFamily(), + checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), + checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), put); + } else if (action instanceof Delete) { + return doCheckAndDelete(checkAndMutate.getRow(), checkAndMutate.getFamily(), + checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), + checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Delete) action); + } else { + return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(), + checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), + checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action); + } + } + + @Override + public boolean[] checkAndMutate(List checkAndMutates) throws IOException { + if (checkAndMutates.isEmpty()) { + return new boolean[]{}; + } + if (checkAndMutates.size() == 1) { + return new boolean[]{ checkAndMutate(checkAndMutates.get(0)) }; + } + + Object[] results = new Object[checkAndMutates.size()]; + try { + batch(checkAndMutates, results, writeRpcTimeoutMs); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + + // translate. + boolean[] ret = new boolean[results.length]; + int i = 0; + for (Object r : results) { + // Batch ensures if there is a failure we get an exception instead + ret[i++] = ((Result) r).getExists(); + } + return ret; + } + private CompareOperator toCompareOperator(CompareOp compareOp) { switch (compareOp) { case LESS: diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index bfc161e5c23..0812f2ab25f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -99,10 +99,10 @@ class MultiServerCallable extends CancellableRegionServerCallable long nonceGroup = multiAction.getNonceGroup(); - // Map from a created RegionAction to the original index for a RowMutations within - // the original list of actions. This will be used to process the results when there - // is RowMutations in the action list. - Map rowMutationsIndexMap = new HashMap<>(); + // Map from a created RegionAction to the original index for a RowMutations/CheckAndMutate + // within the original list of actions. This will be used to process the results when there + // is RowMutations/CheckAndMutate in the action list. + Map indexMap = new HashMap<>(); // The multi object is a list of Actions by region. Iterate by region. for (Map.Entry> e: this.multiAction.actions.entrySet()) { final byte [] regionName = e.getKey(); @@ -110,17 +110,16 @@ class MultiServerCallable extends CancellableRegionServerCallable if (this.cellBlock) { // Send data in cellblocks. // multiRequestBuilder will be populated with region actions. - // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the - // action list. + // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in + // the action list. RequestConverter.buildNoDataRegionActions(regionName, actions, cells, multiRequestBuilder, - regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap); - } - else { + regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, indexMap); + } else { // multiRequestBuilder will be populated with region actions. - // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the - // action list. + // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in + // the action list. RequestConverter.buildRegionActions(regionName, actions, multiRequestBuilder, - regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap); + regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, indexMap); } } @@ -130,7 +129,7 @@ class MultiServerCallable extends CancellableRegionServerCallable ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto); if (responseProto == null) return null; // Occurs on cancel - return ResponseConverter.getResults(requestProto, rowMutationsIndexMap, responseProto, + return ResponseConverter.getResults(requestProto, indexMap, responseProto, getRpcControllerCellScanner()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 8bfdf891d6b..de85ddd9759 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -149,10 +149,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C * @return a list of Cell objects, returns an empty list if one doesn't exist. */ List getCellList(byte[] family) { - List list = this.familyMap.get(family); + List list = getFamilyCellMap().get(family); if (list == null) { list = new ArrayList<>(); - this.familyMap.put(family, list); + getFamilyCellMap().put(family, list); } return list; } @@ -201,11 +201,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C @Override public Map getFingerprint() { Map map = new HashMap<>(); - List families = new ArrayList<>(this.familyMap.entrySet().size()); + List families = new ArrayList<>(getFamilyCellMap().entrySet().size()); // ideally, we would also include table information, but that information // is not stored in each Operation instance. map.put("families", families); - for (Map.Entry> entry : this.familyMap.entrySet()) { + for (Map.Entry> entry : getFamilyCellMap().entrySet()) { families.add(Bytes.toStringBinary(entry.getKey())); } return map; @@ -229,7 +229,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C map.put("row", Bytes.toStringBinary(this.row)); int colCount = 0; // iterate through all column families affected - for (Map.Entry> entry : this.familyMap.entrySet()) { + for (Map.Entry> entry : getFamilyCellMap().entrySet()) { // map from this family to details for each cell affected within the family List> qualifierDetails = new ArrayList<>(); columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails); @@ -319,7 +319,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C * @return true if empty, false otherwise */ public boolean isEmpty() { - return familyMap.isEmpty(); + return getFamilyCellMap().isEmpty(); } /** @@ -461,7 +461,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C */ public int size() { int size = 0; - for (List cells : this.familyMap.values()) { + for (List cells : getFamilyCellMap().values()) { size += cells.size(); } return size; @@ -471,7 +471,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C * @return the number of different families */ public int numFamilies() { - return familyMap.size(); + return getFamilyCellMap().size(); } /** @@ -485,8 +485,8 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C // Adding map overhead heapsize += - ClassSize.align(this.familyMap.size() * ClassSize.MAP_ENTRY); - for(Map.Entry> entry : this.familyMap.entrySet()) { + ClassSize.align(getFamilyCellMap().size() * ClassSize.MAP_ENTRY); + for(Map.Entry> entry : getFamilyCellMap().entrySet()) { //Adding key overhead heapsize += ClassSize.align(ClassSize.ARRAY + entry.getKey().length); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index b4a6cc47048..dc3e530cceb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -396,7 +397,6 @@ class RawAsyncTableImpl implements AsyncTable { return new CheckAndMutateBuilderImpl(row, family); } - private final class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder { @@ -458,6 +458,54 @@ class RawAsyncTableImpl implements AsyncTable { return new CheckAndMutateWithFilterBuilderImpl(row, filter); } + @Override + public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + if (checkAndMutate.getAction() instanceof Put) { + validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); + } + if (checkAndMutate.getAction() instanceof Put || + checkAndMutate.getAction() instanceof Delete) { + Mutation mutation = (Mutation) checkAndMutate.getAction(); + if (mutation instanceof Put) { + validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); + } + return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), + mutation.getPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, + loc, stub, mutation, + (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), + checkAndMutate.getTimeRange(), m), + (c, r) -> r.getProcessed())) + .call(); + } else if (checkAndMutate.getAction() instanceof RowMutations) { + RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); + return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), + rowMutations.getMaxPriority(), rpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, + loc, stub, rowMutations, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), + checkAndMutate.getFamily(), checkAndMutate.getQualifier(), + checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), + checkAndMutate.getTimeRange(), rm), + resp -> resp.getExists())) + .call(); + } else { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new DoNotRetryIOException( + "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); + return future; + } + } + + @Override + public List> checkAndMutate(List checkAndMutates) { + return batch(checkAndMutates, rpcTimeoutNs).stream() + .map(f -> f.thenApply(r -> ((Result)r).getExists())) + .collect(toList()); + } + // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, // so here I write a new method as I do not want to change the abstraction of call method. private CompletableFuture mutateRow(HBaseRpcController controller, @@ -594,8 +642,16 @@ class RawAsyncTableImpl implements AsyncTable { } private List> batch(List actions, long rpcTimeoutNs) { - actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action) - .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize())); + for (Row action : actions) { + if (action instanceof Put) { + validatePut((Put) action, conn.connConf.getMaxKeyValueSize()); + } else if (action instanceof CheckAndMutate) { + CheckAndMutate checkAndMutate = (CheckAndMutate) action; + if (checkAndMutate.getAction() instanceof Put) { + validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); + } + } + } return conn.callerFactory.batch().table(tableName).actions(actions) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index cc2d4145258..b21998029d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -492,14 +492,22 @@ public interface Table extends Closeable { * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put); * * + * + * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated default CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { throw new NotImplementedException("Add an implementation!"); } /** * A helper class for sending checkAndMutate request. + * + * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated interface CheckAndMutateBuilder { /** @@ -562,14 +570,22 @@ public interface Table extends Closeable { * table.checkAndMutate(row, filter).thenPut(put); * * + * + * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { throw new NotImplementedException("Add an implementation!"); } /** * A helper class for sending checkAndMutate request with a filter. + * + * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated interface CheckAndMutateWithFilterBuilder { /** @@ -596,6 +612,31 @@ public interface Table extends Closeable { boolean thenMutate(RowMutations mutation) throws IOException; } + /** + * checkAndMutate that atomically checks if a row matches the specified condition. If it does, + * it performs the specified action. + * + * @param checkAndMutate The CheckAndMutate object. + * @return boolean that represents the result for the CheckAndMutate. + * @throws IOException if a remote or network exception occurs. + */ + default boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + return checkAndMutate(Collections.singletonList(checkAndMutate))[0]; + } + + /** + * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense + * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed + * atomically (and thus, each may fail independently of others). + * + * @param checkAndMutates The list of CheckAndMutate. + * @return A array of boolean that represents the result for each CheckAndMutate. + * @throws IOException if a remote or network exception occurs. + */ + default boolean[] checkAndMutate(List checkAndMutates) throws IOException { + throw new NotImplementedException("Add an implementation!"); + } + /** * Performs multiple mutations atomically on a single row. Currently * {@link Put} and {@link Delete} are supported. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 9657d6eb805..f41af094263 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -237,37 +238,20 @@ public final class RequestConverter { } /** - * Create a protocol buffer MutateRequest for a conditioned put + * Create a protocol buffer MutateRequest for a conditioned put/delete * * @return a mutate request * @throws IOException */ - public static MutateRequest buildMutateRequest( - final byte[] regionName, final byte[] row, final byte[] family, - final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter, - final TimeRange timeRange, final Put put) throws IOException { - return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange, - put, MutationType.PUT); - } - - /** - * Create a protocol buffer MutateRequest for a conditioned delete - * - * @return a mutate request - * @throws IOException - */ - public static MutateRequest buildMutateRequest( - final byte[] regionName, final byte[] row, final byte[] family, - final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter, - final TimeRange timeRange, final Delete delete) throws IOException { - return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange, - delete, MutationType.DELETE); - } - public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, - final Filter filter, final TimeRange timeRange, final Mutation mutation, - final MutationType type) throws IOException { + final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException { + MutationType type; + if (mutation instanceof Put) { + type = MutationType.PUT; + } else { + type = MutationType.DELETE; + } return MutateRequest.newBuilder() .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)) .setMutation(ProtobufUtil.toMutation(type, mutation)) @@ -306,9 +290,8 @@ public final class RequestConverter { actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } - return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) - .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange)) - .build(); + return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition( + buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build(); } /** @@ -426,42 +409,6 @@ public final class RequestConverter { return builder; } - /** - * Create a protocol buffer MultiRequest for row mutations that does not hold data. Data/Cells - * are carried outside of protobuf. Return references to the Cells in cells param. - * Does not propagate Action absolute position. Does not set atomic action on the created - * RegionAtomic. Caller should do that if wanted. - * @param regionName - * @param rowMutations - * @param cells Return in here a list of Cells as CellIterable. - * @return a region mutation minus data - * @throws IOException - */ - public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, - final RowMutations rowMutations, final List cells, - final RegionAction.Builder regionActionBuilder, - final ClientProtos.Action.Builder actionBuilder, - final MutationProto.Builder mutationBuilder) - throws IOException { - for (Mutation mutation: rowMutations.getMutations()) { - MutationType type = null; - if (mutation instanceof Put) { - type = MutationType.PUT; - } else if (mutation instanceof Delete) { - type = MutationType.DELETE; - } else { - throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + - mutation.getClass().getName()); - } - mutationBuilder.clear(); - MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder); - cells.add(mutation); - actionBuilder.clear(); - regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); - } - return regionActionBuilder; - } - public static RegionAction.Builder getRegionActionBuilderWithRegion( final RegionAction.Builder regionActionBuilder, final byte [] regionName) { RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); @@ -627,8 +574,8 @@ public final class RequestConverter { * @param actionBuilder actionBuilder to be used to build action. * @param mutationBuilder mutationBuilder to be used to build mutation. * @param nonceGroup nonceGroup to be applied. - * @param rowMutationsIndexMap Map of created RegionAction to the original index for a - * RowMutations within the original list of actions + * @param indexMap Map of created RegionAction to the original index for a + * RowMutations/CheckAndMutate within the original list of actions * @throws IOException */ public static void buildRegionActions(final byte[] regionName, @@ -636,13 +583,14 @@ public final class RequestConverter { final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder, - long nonceGroup, final Map rowMutationsIndexMap) throws IOException { + long nonceGroup, final Map indexMap) throws IOException { regionActionBuilder.clear(); RegionAction.Builder builder = getRegionActionBuilderWithRegion( regionActionBuilder, regionName); ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null; boolean hasNonce = false; List rowMutationsList = new ArrayList<>(); + List checkAndMutates = new ArrayList<>(); for (Action action: actions) { Row row = action.getAction(); @@ -684,6 +632,8 @@ public final class RequestConverter { .setRequest(value))); } else if (row instanceof RowMutations) { rowMutationsList.add(action); + } else if (row instanceof CheckAndMutate) { + checkAndMutates.add(action); } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } @@ -699,17 +649,78 @@ public final class RequestConverter { // on the one row. We do separate RegionAction for each RowMutations. // We maintain a map to keep track of this RegionAction and the original Action index. for (Action action : rowMutationsList) { - RowMutations rms = (RowMutations) action.getAction(); - RegionAction.Builder rowMutationsRegionActionBuilder = - RequestConverter.buildRegionAction(regionName, rms); - rowMutationsRegionActionBuilder.setAtomic(true); - // Put it in the multiRequestBuilder - multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build()); + builder.clear(); + getRegionActionBuilderWithRegion(builder, regionName); + + buildRegionAction((RowMutations) action.getAction(), builder, actionBuilder, + mutationBuilder); + builder.setAtomic(true); + + multiRequestBuilder.addRegionAction(builder.build()); + // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1) // in the overall multiRequest. - rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1, + indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex()); } + + // Process CheckAndMutate here. Similar to RowMutations, we do separate RegionAction for each + // CheckAndMutate and maintain a map to keep track of this RegionAction and the original + // Action index. + for (Action action : checkAndMutates) { + builder.clear(); + getRegionActionBuilderWithRegion(builder, regionName); + + CheckAndMutate cam = (CheckAndMutate) action.getAction(); + builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(), + cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange())); + + if (cam.getAction() instanceof Put) { + actionBuilder.clear(); + mutationBuilder.clear(); + builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, + (Put) cam.getAction(), mutationBuilder))); + } else if (cam.getAction() instanceof Delete) { + actionBuilder.clear(); + mutationBuilder.clear(); + builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, + (Delete) cam.getAction(), mutationBuilder))); + } else if (cam.getAction() instanceof RowMutations) { + buildRegionAction((RowMutations) cam.getAction(), builder, actionBuilder, + mutationBuilder); + builder.setAtomic(true); + } else { + throw new DoNotRetryIOException("CheckAndMutate doesn't support " + + cam.getAction().getClass().getName()); + } + + multiRequestBuilder.addRegionAction(builder.build()); + + // This CheckAndMutate region action is at (multiRequestBuilder.getRegionActionCount() - 1) + // in the overall multiRequest. + indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex()); + } + } + + private static void buildRegionAction(final RowMutations rowMutations, + final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) + throws IOException { + for (Mutation mutation: rowMutations.getMutations()) { + MutationType mutateType; + if (mutation instanceof Put) { + mutateType = MutationType.PUT; + } else if (mutation instanceof Delete) { + mutateType = MutationType.DELETE; + } else { + throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + + mutation.getClass().getName()); + } + mutationBuilder.clear(); + MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder); + actionBuilder.clear(); + regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); + } } /** @@ -729,8 +740,8 @@ public final class RequestConverter { * @param actionBuilder actionBuilder to be used to build action. * @param mutationBuilder mutationBuilder to be used to build mutation. * @param nonceGroup nonceGroup to be applied. - * @param rowMutationsIndexMap Map of created RegionAction to the original index for a - * RowMutations within the original list of actions + * @param indexMap Map of created RegionAction to the original index for a + * RowMutations/CheckAndMutate within the original list of actions * @throws IOException */ public static void buildNoDataRegionActions(final byte[] regionName, @@ -739,14 +750,14 @@ public final class RequestConverter { final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder, - long nonceGroup, final Map rowMutationsIndexMap) throws IOException { + long nonceGroup, final Map indexMap) throws IOException { regionActionBuilder.clear(); RegionAction.Builder builder = getRegionActionBuilderWithRegion( regionActionBuilder, regionName); ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null; - RegionAction.Builder rowMutationsRegionActionBuilder = null; boolean hasNonce = false; List rowMutationsList = new ArrayList<>(); + List checkAndMutates = new ArrayList<>(); for (Action action: actions) { Row row = action.getAction(); @@ -757,26 +768,9 @@ public final class RequestConverter { Get g = (Get)row; builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g))); } else if (row instanceof Put) { - Put p = (Put)row; - cells.add(p); - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, mutationBuilder))); + buildNoDataRegionAction((Put) row, cells, builder, actionBuilder, mutationBuilder); } else if (row instanceof Delete) { - Delete d = (Delete)row; - int size = d.size(); - // Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing - // in it but the row to delete. In this case, the current implementation does not make - // a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases - // where the size returned is zero, we will send the Delete fully pb'd rather than have - // metadata only in the pb and then send the kv along the side in cells. - if (size > 0) { - cells.add(d); - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, mutationBuilder))); - } else { - builder.addAction(actionBuilder. - setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, mutationBuilder))); - } + buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, mutationBuilder); } else if (row instanceof Append) { Append a = (Append)row; cells.add(a); @@ -807,6 +801,8 @@ public final class RequestConverter { .setRequest(value))); } else if (row instanceof RowMutations) { rowMutationsList.add(action); + } else if (row instanceof CheckAndMutate) { + checkAndMutates.add(action); } else { throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); } @@ -822,23 +818,106 @@ public final class RequestConverter { // on the one row. We do separate RegionAction for each RowMutations. // We maintain a map to keep track of this RegionAction and the original Action index. for (Action action : rowMutationsList) { - RowMutations rms = (RowMutations) action.getAction(); - if (rowMutationsRegionActionBuilder == null) { - rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder(); - } else { - rowMutationsRegionActionBuilder.clear(); - } - rowMutationsRegionActionBuilder.setRegion( - RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); - rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, - cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder); - rowMutationsRegionActionBuilder.setAtomic(true); - // Put it in the multiRequestBuilder - multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build()); + builder.clear(); + getRegionActionBuilderWithRegion(builder, regionName); + + buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder, + mutationBuilder); + builder.setAtomic(true); + + multiRequestBuilder.addRegionAction(builder.build()); + // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1) // in the overall multiRequest. - rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1, - action.getOriginalIndex()); + indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex()); + } + + // Process CheckAndMutate here. Similar to RowMutations, we do separate RegionAction for each + // CheckAndMutate and maintain a map to keep track of this RegionAction and the original + // Action index. + for (Action action : checkAndMutates) { + builder.clear(); + getRegionActionBuilderWithRegion(builder, regionName); + + CheckAndMutate cam = (CheckAndMutate) action.getAction(); + builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(), + cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange())); + + if (cam.getAction() instanceof Put) { + actionBuilder.clear(); + mutationBuilder.clear(); + buildNoDataRegionAction((Put) cam.getAction(), cells, builder, actionBuilder, + mutationBuilder); + } else if (cam.getAction() instanceof Delete) { + actionBuilder.clear(); + mutationBuilder.clear(); + buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, actionBuilder, + mutationBuilder); + } else if (cam.getAction() instanceof RowMutations) { + buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder, + mutationBuilder); + builder.setAtomic(true); + } else { + throw new DoNotRetryIOException("CheckAndMutate doesn't support " + + cam.getAction().getClass().getName()); + } + + multiRequestBuilder.addRegionAction(builder.build()); + + // This CheckAndMutate region action is at (multiRequestBuilder.getRegionActionCount() - 1) + // in the overall multiRequest. + indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex()); + } + } + + private static void buildNoDataRegionAction(final Put put, final List cells, + final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, + final MutationProto.Builder mutationBuilder) throws IOException { + cells.add(put); + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, put, mutationBuilder))); + } + + private static void buildNoDataRegionAction(final Delete delete, + final List cells, final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) + throws IOException { + int size = delete.size(); + // Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing + // in it but the row to delete. In this case, the current implementation does not make + // a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases + // where the size returned is zero, we will send the Delete fully pb'd rather than have + // metadata only in the pb and then send the kv along the side in cells. + if (size > 0) { + cells.add(delete); + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, delete, mutationBuilder))); + } else { + regionActionBuilder.addAction(actionBuilder. + setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, mutationBuilder))); + } + } + + private static void buildNoDataRegionAction(final RowMutations rowMutations, + final List cells, final RegionAction.Builder regionActionBuilder, + final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) + throws IOException { + for (Mutation mutation: rowMutations.getMutations()) { + MutationType type; + if (mutation instanceof Put) { + type = MutationType.PUT; + } else if (mutation instanceof Delete) { + type = MutationType.DELETE; + } else { + throw new DoNotRetryIOException("RowMutations supports only put and delete, not " + + mutation.getClass().getName()); + } + mutationBuilder.clear(); + MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder); + cells.add(mutation); + actionBuilder.clear(); + regionActionBuilder.addAction(actionBuilder.setMutation(mp).build()); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java index 12661a6562d..b414b7b6dbb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java @@ -105,14 +105,14 @@ public final class ResponseConverter { * Get the results from a protocol buffer MultiResponse * * @param request the original protocol buffer MultiRequest - * @param rowMutationsIndexMap Used to support RowMutations in batch + * @param indexMap Used to support RowMutations/CheckAndMutate in batch * @param response the protocol buffer MultiResponse to convert * @param cells Cells to go with the passed in proto. Can be null. * @return the results that were in the MultiResponse (a Result or an Exception). * @throws IOException */ public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request, - final Map rowMutationsIndexMap, final MultiResponse response, + final Map indexMap, final MultiResponse response, final CellScanner cells) throws IOException { int requestRegionActionCount = request.getRegionActionCount(); int responseRegionActionResultCount = response.getRegionActionResultCount(); @@ -149,18 +149,17 @@ public final class ResponseConverter { Object responseValue; - // For RowMutations action, if there is an exception, the exception is set + // For RowMutations/CheckAndMutate action, if there is an exception, the exception is set // at the RegionActionResult level and the ResultOrException is null at the original index - Integer rowMutationsIndex = - (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i)); - if (rowMutationsIndex != null) { - // This RegionAction is from a RowMutations in a batch. + Integer index = (indexMap == null ? null : indexMap.get(i)); + if (index != null) { + // This RegionAction is from a RowMutations/CheckAndMutate in a batch. // If there is an exception from the server, the exception is set at // the RegionActionResult level, which has been handled above. - responseValue = response.getProcessed() ? + responseValue = actionResult.getProcessed() ? ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; - results.add(regionName, rowMutationsIndex, responseValue); + results.add(regionName, index, responseValue); continue; } @@ -171,11 +170,11 @@ public final class ResponseConverter { responseValue = ProtobufUtil.toResult(roe.getResult(), cells); } else if (roe.hasServiceResult()) { responseValue = roe.getServiceResult(); - } else{ + } else { // Sometimes, the response is just "it was processed". Generally, this occurs for things // like mutateRows where either we get back 'processed' (or not) and optionally some // statistics about the regions we touched. - responseValue = response.getProcessed() ? + responseValue = actionResult.getProcessed() ? ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE; } diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 810aaaa393a..1b8b98642b4 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -455,6 +455,7 @@ message RegionAction { // When set, run mutations as atomic unit. optional bool atomic = 2; repeated Action action = 3; + optional Condition condition = 4; } /* @@ -499,6 +500,7 @@ message RegionActionResult { repeated ResultOrException resultOrException = 1; // If the operation failed globally for this region, this exception is set optional NameBytesPair exception = 2; + optional bool processed = 3; } /** @@ -511,13 +513,16 @@ message RegionActionResult { message MultiRequest { repeated RegionAction regionAction = 1; optional uint64 nonceGroup = 2; - optional Condition condition = 3; + // Moved this to RegionAction in HBASE-8458. Keep it for backward compatibility. Need to remove + // it in the future. + optional Condition condition = 3 [deprecated=true]; } message MultiResponse { repeated RegionActionResult regionActionResult = 1; - // used for mutate to indicate processed only - optional bool processed = 2; + // Moved this to RegionActionResult in HBASE-8458. Keep it for backward compatibility. Need to + // remove it in the future. + optional bool processed = 2 [deprecated=true]; optional MultiRegionLoadStats regionStatistics = 3; } diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 9f66f702627..b2a6ec39fe8 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -454,6 +454,7 @@ message RegionAction { // When set, run mutations as atomic unit. optional bool atomic = 2; repeated Action action = 3; + optional Condition condition = 4; } /* @@ -498,6 +499,7 @@ message RegionActionResult { repeated ResultOrException resultOrException = 1; // If the operation failed globally for this region, this exception is set optional NameBytesPair exception = 2; + optional bool processed = 3; } /** @@ -510,13 +512,16 @@ message RegionActionResult { message MultiRequest { repeated RegionAction regionAction = 1; optional uint64 nonceGroup = 2; - optional Condition condition = 3; + // Moved this to RegionAction in HBASE-8458. Keep it for backward compatibility. Need to remove + // it in the future. + optional Condition condition = 3 [deprecated=true]; } message MultiResponse { repeated RegionActionResult regionActionResult = 1; - // used for mutate to indicate processed only - optional bool processed = 2; + // Moved this to RegionActionResult in HBASE-8458. Keep it for backward compatibility. Need to + // remove it in the future. + optional bool processed = 2 [deprecated=true]; optional MultiRegionLoadStats regionStatistics = 3; } diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index e076b1ae914..cd9ec7fc269 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -52,6 +52,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -811,6 +812,16 @@ public class RemoteHTable implements Table { throw new UnsupportedOperationException("checkAndMutate not implemented"); } + @Override + public boolean checkAndMutate(CheckAndMutate checkAndMutate) { + throw new NotImplementedException("Implement later"); + } + + @Override + public boolean[] checkAndMutate(List checkAndMutates) { + throw new NotImplementedException("Implement later"); + } + @Override public Result increment(Increment increment) throws IOException { throw new IOException("Increment not supported"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 8cab3fea4b0..b48e65782da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2730,6 +2730,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private void failRegionAction(MultiResponse.Builder responseBuilder, + RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, + CellScanner cellScanner, Throwable error) { + rpcServer.getMetrics().exception(error); + regionActionResultBuilder.setException(ResponseConverter.buildException(error)); + responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); + // All Mutations in this RegionAction not executed as we can not see the Region online here + // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner + // corresponding to these Mutations. + if (cellScanner != null) { + skipCellsForMutations(regionAction.getActionList(), cellScanner); + } + } + /** * Execute multiple actions on a table: get, mutate, and/or execCoprocessor * @@ -2758,43 +2772,103 @@ public class RSRpcServices implements HBaseRPCErrorHandler, long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; - // this will contain all the cells that we need to return. It's created later, if needed. - List cellsToReturn = null; MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); - Boolean processed = null; - RegionScannersCloseCallBack closeCallBack = null; - RpcCallContext context = RpcServer.getCurrentCall().orElse(null); this.rpcMultiRequestCount.increment(); this.requestCount.increment(); - Map regionStats = new HashMap<>(request - .getRegionActionCount()); ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); - for (RegionAction regionAction : request.getRegionActionList()) { + + // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The + // following logic is for backward compatibility as old clients still use + // MultiRequest#condition in case of checkAndMutate with RowMutations. + if (request.hasCondition()) { + if (request.getRegionActionList().isEmpty()) { + // If the region action list is empty, do nothing. + responseBuilder.setProcessed(true); + return responseBuilder.build(); + } + + RegionAction regionAction = request.getRegionAction(0); + + // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So + // we can assume regionAction.getAtomic() is true here. + assert regionAction.getAtomic(); + OperationQuota quota; HRegion region; - regionActionResultBuilder.clear(); RegionSpecifier regionSpecifier = regionAction.getRegion(); + try { region = getRegion(regionSpecifier); quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); + } catch (IOException e) { + failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); + return responseBuilder.build(); + } + + try { + Condition condition = request.getCondition(); + byte[] row = condition.getRow().toByteArray(); + byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; + byte[] qualifier = + condition.hasQualifier() ? condition.getQualifier().toByteArray() : null; + CompareOperator op = condition.hasCompareType() ? + CompareOperator.valueOf(condition.getCompareType().name()) : + null; + ByteArrayComparable comparator = condition.hasComparator() ? + ProtobufUtil.toComparator(condition.getComparator()) : null; + Filter filter = + condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null; + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); + boolean processed = + checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, + qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, + spaceQuotaEnforcement); + responseBuilder.setProcessed(processed); } catch (IOException e) { rpcServer.getMetrics().exception(e); + // As it's an atomic operation with a condition, we may expect it's a global failure. regionActionResultBuilder.setException(ResponseConverter.buildException(e)); - responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); - // All Mutations in this RegionAction not executed as we can not see the Region online here - // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner - // corresponding to these Mutations. - skipCellsForMutations(regionAction.getActionList(), cellScanner); + } finally { + quota.close(); + } + + responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); + ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); + if (regionLoadStats != null) { + responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() + .addRegion(regionSpecifier).addStat(regionLoadStats).build()); + } + return responseBuilder.build(); + } + + // this will contain all the cells that we need to return. It's created later, if needed. + List cellsToReturn = null; + RegionScannersCloseCallBack closeCallBack = null; + RpcCallContext context = RpcServer.getCurrentCall().orElse(null); + Map regionStats = new HashMap<>(request + .getRegionActionCount()); + + for (RegionAction regionAction : request.getRegionActionList()) { + OperationQuota quota; + HRegion region; + RegionSpecifier regionSpecifier = regionAction.getRegion(); + regionActionResultBuilder.clear(); + + try { + region = getRegion(regionSpecifier); + quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); + } catch (IOException e) { + failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); continue; // For this region it's a failure. } - if (regionAction.hasAtomic() && regionAction.getAtomic()) { - // How does this call happen? It may need some work to play well w/ the surroundings. - // Need to return an item per Action along w/ Action index. TODO. - try { - if (request.hasCondition()) { - Condition condition = request.getCondition(); + try { + if (regionAction.hasCondition()) { + try { + Condition condition = regionAction.getCondition(); byte[] row = condition.getRow().toByteArray(); byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null; byte[] qualifier = condition.hasQualifier() ? @@ -2806,37 +2880,104 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null; TimeRange timeRange = condition.hasTimeRange() ? - ProtobufUtil.toTimeRange(condition.getTimeRange()) : - TimeRange.allTime(); - processed = - checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, - qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, - spaceQuotaEnforcement); - } else { + ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); + + boolean processed; + if (regionAction.hasAtomic() && regionAction.getAtomic()) { + // RowMutations + processed = + checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, + qualifier, op, comparator, filter, timeRange, regionActionResultBuilder, + spaceQuotaEnforcement); + } else { + if (regionAction.getActionList().isEmpty()) { + // If the region action list is empty, do nothing. + regionActionResultBuilder.setProcessed(true); + continue; + } + Action action = regionAction.getAction(0); + if (action.hasGet()) { + throw new DoNotRetryIOException("CheckAndMutate doesn't support GET=" + + action.getGet()); + } + MutationProto mutation = action.getMutation(); + switch (mutation.getMutateType()) { + case PUT: + Put put = ProtobufUtil.toPut(mutation, cellScanner); + checkCellSizeLimit(region, put); + // Throws an exception when violated + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + quota.addMutation(put); + + if (filter != null) { + processed = region.checkAndMutate(row, filter, timeRange, put); + } else { + processed = region.checkAndMutate(row, family, qualifier, op, comparator, + timeRange, put); + } + break; + + case DELETE: + Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); + checkCellSizeLimit(region, delete); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); + quota.addMutation(delete); + + if (filter != null) { + processed = region.checkAndMutate(row, filter, timeRange, delete); + } else { + processed = region.checkAndMutate(row, family, qualifier, op, comparator, + timeRange, delete); + } + break; + + default: + throw new DoNotRetryIOException("CheckAndMutate doesn't support " + + mutation.getMutateType()); + } + + // To unify the response format with doNonAtomicRegionMutation and read through + // client's AsyncProcess we have to add an empty result instance per operation + regionActionResultBuilder.addResultOrException( + ClientProtos.ResultOrException.newBuilder().setIndex(0).build()); + } + regionActionResultBuilder.setProcessed(processed); + } catch (IOException e) { + rpcServer.getMetrics().exception(e); + // As it's an atomic operation with a condition, we may expect it's a global failure. + regionActionResultBuilder.setException(ResponseConverter.buildException(e)); + } + } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { + try { doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), cellScanner, spaceQuotaEnforcement); - processed = Boolean.TRUE; + regionActionResultBuilder.setProcessed(true); + // We no longer use MultiResponse#processed. Instead, we use + // RegionActionResult#condition. This is for backward compatibility for old clients. + responseBuilder.setProcessed(true); + } catch (IOException e) { + rpcServer.getMetrics().exception(e); + // As it's atomic, we may expect it's a global failure. + regionActionResultBuilder.setException(ResponseConverter.buildException(e)); } - } catch (IOException e) { - rpcServer.getMetrics().exception(e); - // As it's atomic, we may expect it's a global failure. - regionActionResultBuilder.setException(ResponseConverter.buildException(e)); - } - } else { - // doNonAtomicRegionMutation manages the exception internally - if (context != null && closeCallBack == null) { - // An RpcCallBack that creates a list of scanners that needs to perform callBack - // operation on completion of multiGets. - // Set this only once - closeCallBack = new RegionScannersCloseCallBack(); - context.setCallBack(closeCallBack); - } - cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, + } else { + // doNonAtomicRegionMutation manages the exception internally + if (context != null && closeCallBack == null) { + // An RpcCallBack that creates a list of scanners that needs to perform callBack + // operation on completion of multiGets. + // Set this only once + closeCallBack = new RegionScannersCloseCallBack(); + context.setCallBack(closeCallBack); + } + cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, spaceQuotaEnforcement); + } + } finally { + quota.close(); } + responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); - quota.close(); ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); if(regionLoadStats != null) { regionStats.put(regionSpecifier, regionLoadStats); @@ -2847,10 +2988,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); } - if (processed != null) { - responseBuilder.setProcessed(processed); - } - MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); for(Entry stat: regionStats.entrySet()){ builder.addRegion(stat.getKey()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index b9fb8119575..0de18929d7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -246,9 +246,30 @@ public class TestAsyncTable { assertArrayEquals(IntStream.range(0, count).toArray(), actual); } + @Test + public void testMutateRow() throws InterruptedException, ExecutionException, IOException { + AsyncTable table = getTable.get(); + RowMutations mutation = new RowMutations(row); + mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); + table.mutateRow(mutation).get(); + Result result = table.get(new Get(row)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); + + mutation = new RowMutations(row); + mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); + mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); + table.mutateRow(mutation).get(); + result = table.get(new Get(row)).get(); + assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); + } + + // Tests for old checkAndMutate API + @SuppressWarnings("FutureReturnValueIgnored") @Test - public void testCheckAndPut() throws InterruptedException, ExecutionException { + @Deprecated + public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException { AsyncTable table = getTable.get(); AtomicInteger successCount = new AtomicInteger(0); AtomicInteger successIndex = new AtomicInteger(-1); @@ -271,7 +292,8 @@ public class TestAsyncTable { @SuppressWarnings("FutureReturnValueIgnored") @Test - public void testCheckAndDelete() throws InterruptedException, ExecutionException { + @Deprecated + public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException { AsyncTable table = getTable.get(); int count = 10; CountDownLatch putLatch = new CountDownLatch(count + 1); @@ -307,27 +329,10 @@ public class TestAsyncTable { }); } - @Test - public void testMutateRow() throws InterruptedException, ExecutionException, IOException { - AsyncTable table = getTable.get(); - RowMutations mutation = new RowMutations(row); - mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); - table.mutateRow(mutation).get(); - Result result = table.get(new Get(row)).get(); - assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); - - mutation = new RowMutations(row); - mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); - mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); - table.mutateRow(mutation).get(); - result = table.get(new Get(row)).get(); - assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); - assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); - } - @SuppressWarnings("FutureReturnValueIgnored") @Test - public void testCheckAndMutate() throws InterruptedException, ExecutionException { + @Deprecated + public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException { AsyncTable table = getTable.get(); int count = 10; CountDownLatch putLatch = new CountDownLatch(count + 1); @@ -371,7 +376,8 @@ public class TestAsyncTable { } @Test - public void testCheckAndMutateWithTimeRange() throws Exception { + @Deprecated + public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception { AsyncTable table = getTable.get(); final long ts = System.currentTimeMillis() / 2; Put put = new Put(row); @@ -390,6 +396,7 @@ public class TestAsyncTable { assertTrue(ok); RowMutations rm = new RowMutations(row).add((Mutation) put); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) .ifEquals(VALUE).thenMutate(rm).get(); assertFalse(ok); @@ -410,7 +417,8 @@ public class TestAsyncTable { } @Test - public void testCheckAndMutateWithSingleFilter() throws Throwable { + @Deprecated + public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable { AsyncTable table = getTable.get(); // Put one row @@ -465,7 +473,8 @@ public class TestAsyncTable { } @Test - public void testCheckAndMutateWithMultipleFilters() throws Throwable { + @Deprecated + public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable { AsyncTable table = getTable.get(); // Put one row @@ -536,7 +545,8 @@ public class TestAsyncTable { } @Test - public void testCheckAndMutateWithTimestampFilter() throws Throwable { + @Deprecated + public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable { AsyncTable table = getTable.get(); // Put with specifying the timestamp @@ -569,7 +579,8 @@ public class TestAsyncTable { } @Test - public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { + @Deprecated + public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable { AsyncTable table = getTable.get(); // Put with specifying the timestamp @@ -599,11 +610,678 @@ public class TestAsyncTable { } @Test(expected = NullPointerException.class) - public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable { + @Deprecated + public void testCheckAndMutateWithoutConditionForOldApi() { getTable.get().checkAndMutate(row, FAMILY) .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); } + // Tests for new CheckAndMutate API + + @SuppressWarnings("FutureReturnValueIgnored") + @Test + public void testCheckAndPut() throws InterruptedException, ExecutionException { + AsyncTable table = getTable.get(); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger successIndex = new AtomicInteger(-1); + int count = 10; + CountDownLatch latch = new CountDownLatch(count); + + IntStream.range(0, count) + .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER) + .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + latch.countDown(); + })); + latch.await(); + assertEquals(1, successCount.get()); + String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); + assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Test + public void testCheckAndDelete() throws InterruptedException, ExecutionException { + AsyncTable table = getTable.get(); + int count = 10; + CountDownLatch putLatch = new CountDownLatch(count + 1); + table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); + IntStream.range(0, count) + .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) + .thenRun(() -> putLatch.countDown())); + putLatch.await(); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger successIndex = new AtomicInteger(-1); + CountDownLatch deleteLatch = new CountDownLatch(count); + + IntStream.range(0, count) + .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, QUALIFIER, VALUE) + .build( + new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + deleteLatch.countDown(); + })); + deleteLatch.await(); + assertEquals(1, successCount.get()); + Result result = table.get(new Get(row)).get(); + IntStream.range(0, count).forEach(i -> { + if (i == successIndex.get()) { + assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); + } else { + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); + } + }); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Test + public void testCheckAndMutate() throws InterruptedException, ExecutionException { + AsyncTable table = getTable.get(); + int count = 10; + CountDownLatch putLatch = new CountDownLatch(count + 1); + table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); + IntStream.range(0, count) + .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) + .thenRun(() -> putLatch.countDown())); + putLatch.await(); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger successIndex = new AtomicInteger(-1); + CountDownLatch mutateLatch = new CountDownLatch(count); + IntStream.range(0, count).forEach(i -> { + RowMutations mutation = new RowMutations(row); + try { + mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); + mutation + .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, QUALIFIER, VALUE) + .build(mutation)) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + mutateLatch.countDown(); + }); + }); + mutateLatch.await(); + assertEquals(1, successCount.get()); + Result result = table.get(new Get(row)).get(); + IntStream.range(0, count).forEach(i -> { + if (i == successIndex.get()) { + assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); + } else { + assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); + } + }); + } + + @Test + public void testCheckAndMutateWithTimeRange() throws Exception { + AsyncTable table = getTable.get(); + final long ts = System.currentTimeMillis() / 2; + Put put = new Put(row); + put.addColumn(FAMILY, QUALIFIER, ts, VALUE); + + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER) + .build(put)).get(); + assertTrue(ok); + + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, QUALIFIER, VALUE) + .timeRange(TimeRange.at(ts + 10000)) + .build(put)).get(); + assertFalse(ok); + + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, QUALIFIER, VALUE) + .timeRange(TimeRange.at(ts)) + .build(put)).get(); + assertTrue(ok); + + RowMutations rm = new RowMutations(row).add((Mutation) put); + + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, QUALIFIER, VALUE) + .timeRange(TimeRange.at(ts + 10000)) + .build(rm)).get(); + assertFalse(ok); + + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, QUALIFIER, VALUE) + .timeRange(TimeRange.at(ts)) + .build(rm)).get(); + assertTrue(ok); + + Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); + + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, QUALIFIER, VALUE) + .timeRange(TimeRange.at(ts + 10000)) + .build(delete)).get(); + assertFalse(ok); + + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, QUALIFIER, VALUE) + .timeRange(TimeRange.at(ts)) + .build(delete)).get(); + assertTrue(ok); + } + + @Test + public void testCheckAndMutateWithSingleFilter() throws Throwable { + AsyncTable table = getTable.get(); + + // Put one row + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); + table.put(put).get(); + + // Put with success + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); + assertTrue(ok); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("b"))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); + assertFalse(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); + + // Delete with success + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get(); + assertTrue(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); + + // Mutate with success + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), + CompareOperator.EQUAL, Bytes.toBytes("b"))) + .build(new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get(); + assertTrue(ok); + + result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); + } + + @Test + public void testCheckAndMutateWithMultipleFilters() throws Throwable { + AsyncTable table = getTable.get(); + + // Put one row + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); + put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); + put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); + table.put(put).get(); + + // Put with success + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); + assertTrue(ok); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("c")))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); + assertFalse(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); + + // Delete with success + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get(); + assertTrue(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); + + // Mutate with success + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get(); + assertTrue(ok); + + result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); + } + + @Test + public void testCheckAndMutateWithTimestampFilter() throws Throwable { + AsyncTable table = getTable.get(); + + // Put with specifying the timestamp + table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); + + // Put with success + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), + new TimestampsFilter(Collections.singletonList(100L)))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); + assertTrue(ok); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), + new TimestampsFilter(Collections.singletonList(101L)))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get(); + assertFalse(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); + } + + @Test + public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { + AsyncTable table = getTable.get(); + + // Put with specifying the timestamp + table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))) + .get(); + + // Put with success + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); + assertTrue(ok); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = table.checkAndMutate(CheckAndMutate.newBuilder(row) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))) + .get(); + assertFalse(ok); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); + } + + // Tests for batch version of checkAndMutate + + @Test + public void testCheckAndMutateBatch() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); + byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); + + table.putAll(Arrays.asList( + new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); + + // Test for Put + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for Delete + checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")) + .build(new Delete(row)); + + checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) + .build(new Delete(row2)); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for RowMutations + checkAndMutate1 = CheckAndMutate.newBuilder(row3) + .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) + .build(new RowMutations(row3) + .add((Mutation) new Put(row3) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))); + + checkAndMutate2 = CheckAndMutate.newBuilder(row4) + .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")) + .build(new RowMutations(row4) + .add((Mutation) new Put(row4) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D")))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + result = table.get(new Get(row3)).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + assertNull(result.getValue(FAMILY, Bytes.toBytes("D"))); + + result = table.get(new Get(row4)).get(); + assertNull(result.getValue(FAMILY, Bytes.toBytes("F"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + } + + @Test + public void testCheckAndMutateBatch2() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); + byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); + + table.putAll(Arrays.asList( + new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")), + new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))).get(); + + // Test for ifNotExists() + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, Bytes.toBytes("B")) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifNotExists(FAMILY, Bytes.toBytes("B")) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for ifMatches() + checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a")) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); + + checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b")) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); + assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for timeRange() + checkAndMutate1 = CheckAndMutate.newBuilder(row3) + .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e"))); + + checkAndMutate2 = CheckAndMutate.newBuilder(row4) + .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get(); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + } + + @Test + public void testCheckAndMutateBatchWithFilter() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + + table.putAll(Arrays.asList( + new Put(row) + .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row2) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) + .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get(); + + // Test for Put + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + // Test for Delete + checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C"))); + + checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F"))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + // Test for RowMutations + checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(row) + .add((Mutation) new Put(row) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) + .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); + + checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(row2) + .add((Mutation) new Put(row2) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g"))) + .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D")))); + + results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + result = table.get(new Get(row)).get(); + assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); + assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(row2)).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + } + + @Test + public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable { + AsyncTable table = getTable.get(); + byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); + + table.putAll(Arrays.asList( + new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")) + .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")) + .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e")) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get(); + + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("d")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("e")))) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); + + List results = + table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); + + assertTrue(results.get(0)); + assertFalse(results.get(1)); + + Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + } + @Test public void testDisabled() throws InterruptedException, ExecutionException { ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index 42e61d7456e..ac82314b607 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -333,4 +334,57 @@ public class TestAsyncTableBatch { assertThat(e.getMessage(), containsString("KeyValue size too large")); } } + + @Test + public void testWithCheckAndMutate() throws Exception { + AsyncTable table = tableGetter.apply(TABLE_NAME); + + byte[] row1 = Bytes.toBytes("row1"); + byte[] row2 = Bytes.toBytes("row2"); + byte[] row3 = Bytes.toBytes("row3"); + byte[] row4 = Bytes.toBytes("row4"); + byte[] row5 = Bytes.toBytes("row5"); + + table.putAll(Arrays.asList( + new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), + new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); + + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1) + .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g"))); + Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); + RowMutations mutations = new RowMutations(row3) + .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) + .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))); + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4) + .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) + .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); + Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); + + List actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put); + List results = table.batchAll(actions).get(); + + assertTrue(((Result) results.get(0)).getExists()); + assertEquals("b", + Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B")))); + assertTrue(((Result) results.get(2)).getExists()); + assertFalse(((Result) results.get(3)).getExists()); + assertTrue(((Result) results.get(4)).isEmpty()); + + Result result = table.get(new Get(row1)).get(); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row3)).get(); + assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + result = table.get(new Get(row4)).get(); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + result = table.get(new Get(row5)).get(); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java index 934c09c7e0f..67dad1958ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -55,6 +57,9 @@ public class TestCheckAndMutate { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final byte[] ROWKEY = Bytes.toBytes("12345"); + private static final byte[] ROWKEY2 = Bytes.toBytes("67890"); + private static final byte[] ROWKEY3 = Bytes.toBytes("abcde"); + private static final byte[] ROWKEY4 = Bytes.toBytes("fghij"); private static final byte[] FAMILY = Bytes.toBytes("cf"); @Rule @@ -129,8 +134,11 @@ public class TestCheckAndMutate { return rm; } + // Tests for old checkAndMutate API + @Test - public void testCheckAndMutate() throws Throwable { + @Deprecated + public void testCheckAndMutateForOldApi() throws Throwable { try (Table table = createTable()) { // put one row putOneRow(table); @@ -163,40 +171,8 @@ public class TestCheckAndMutate { } @Test - public void testCheckAndMutateWithBuilder() throws Throwable { - try (Table table = createTable()) { - // put one row - putOneRow(table); - // get row back and assert the values - getOneRowAndAssertAllExist(table); - - // put the same row again with C column deleted - RowMutations rm = makeRowMutationsWithColumnCDeleted(); - boolean res = table.checkAndMutate(ROWKEY, FAMILY).qualifier(Bytes.toBytes("A")) - .ifEquals(Bytes.toBytes("a")).thenMutate(rm); - assertTrue(res); - - // get row back and assert the values - getOneRowAndAssertAllButCExist(table); - - //Test that we get a region level exception - try { - rm = getBogusRowMutations(); - table.checkAndMutate(ROWKEY, FAMILY).qualifier(Bytes.toBytes("A")) - .ifEquals(Bytes.toBytes("a")).thenMutate(rm); - fail("Expected NoSuchColumnFamilyException"); - } catch (RetriesExhaustedWithDetailsException e) { - try { - throw e.getCause(0); - } catch (NoSuchColumnFamilyException e1) { - // expected - } - } - } - } - - @Test - public void testCheckAndMutateWithSingleFilter() throws Throwable { + @Deprecated + public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable { try (Table table = createTable()) { // put one row putOneRow(table); @@ -245,7 +221,8 @@ public class TestCheckAndMutate { } @Test - public void testCheckAndMutateWithMultipleFilters() throws Throwable { + @Deprecated + public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable { try (Table table = createTable()) { // put one row putOneRow(table); @@ -310,7 +287,8 @@ public class TestCheckAndMutate { } @Test - public void testCheckAndMutateWithTimestampFilter() throws Throwable { + @Deprecated + public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable { try (Table table = createTable()) { // Put with specifying the timestamp table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); @@ -341,7 +319,8 @@ public class TestCheckAndMutate { } @Test - public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { + @Deprecated + public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable { try (Table table = createTable()) { // Put with specifying the timestamp table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); @@ -368,10 +347,533 @@ public class TestCheckAndMutate { } @Test(expected = NullPointerException.class) - public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable { + @Deprecated + public void testCheckAndMutateWithoutConditionForOldApi() throws Throwable { try (Table table = createTable()) { table.checkAndMutate(ROWKEY, FAMILY) .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); } } + + // Tests for new CheckAndMutate API + + @Test + public void testCheckAndMutate() throws Throwable { + try (Table table = createTable()) { + // put one row + putOneRow(table); + // get row back and assert the values + getOneRowAndAssertAllExist(table); + + // put the same row again with C column deleted + RowMutations rm = makeRowMutationsWithColumnCDeleted(); + boolean res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .build(rm)); + assertTrue(res); + + // get row back and assert the values + getOneRowAndAssertAllButCExist(table); + + // Test that we get a region level exception + try { + rm = getBogusRowMutations(); + table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .build(rm)); + fail("Expected NoSuchColumnFamilyException"); + } catch (RetriesExhaustedWithDetailsException e) { + try { + throw e.getCause(0); + } catch (NoSuchColumnFamilyException e1) { + // expected + } + } + } + } + + @Test + public void testCheckAndMutateWithSingleFilter() throws Throwable { + try (Table table = createTable()) { + // put one row + putOneRow(table); + // get row back and assert the values + getOneRowAndAssertAllExist(table); + + // Put with success + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new SingleColumnValueFilter(FAMILY, + Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a"))) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); + assertTrue(ok); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("b"))) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))); + assertFalse(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E")))); + + // Delete with success + ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")))); + assertTrue(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")))); + + // Mutate with success + ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), + CompareOperator.EQUAL, Bytes.toBytes("b"))) + .build(new RowMutations(ROWKEY) + .add((Mutation) new Put(ROWKEY) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))))); + assertTrue(ok); + + result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")))); + } + } + + @Test + public void testCheckAndMutateWithMultipleFilters() throws Throwable { + try (Table table = createTable()) { + // put one row + putOneRow(table); + // get row back and assert the values + getOneRowAndAssertAllExist(table); + + // Put with success + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); + assertTrue(ok); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + // Put with failure + ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("c")))) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))); + assertFalse(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E")))); + + // Delete with success + ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")))); + assertTrue(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")))); + + // Mutate with success + ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(ROWKEY) + .add((Mutation) new Put(ROWKEY) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) + .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))))); + assertTrue(ok); + + result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")))); + } + } + + @Test + public void testCheckAndMutateWithTimestampFilter() throws Throwable { + try (Table table = createTable()) { + // Put with specifying the timestamp + table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); + + // Put with success + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), + new TimestampsFilter(Collections.singletonList(100L)))) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))); + assertTrue(ok); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), + new TimestampsFilter(Collections.singletonList(101L)))) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))); + assertFalse(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")))); + } + } + + @Test + public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { + try (Table table = createTable()) { + // Put with specifying the timestamp + table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))); + + // Put with success + boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new SingleColumnValueFilter(FAMILY, + Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))); + assertTrue(ok); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Put with failure + ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), + CompareOperator.EQUAL, Bytes.toBytes("a"))) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))); + assertFalse(ok); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")))); + } + } + + @Test(expected = IllegalStateException.class) + public void testCheckAndMutateBuilderWithoutCondition() { + CheckAndMutate.newBuilder(ROWKEY) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); + } + + // Tests for batch version of checkAndMutate + + @Test + public void testCheckAndMutateBatch() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); + + // Test for Put + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for Delete + checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")) + .build(new Delete(ROWKEY)); + + checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) + .build(new Delete(ROWKEY2)); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for RowMutations + checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY3) + .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) + .build(new RowMutations(ROWKEY3) + .add((Mutation) new Put(ROWKEY3) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add((Mutation) new Delete(ROWKEY3).addColumns(FAMILY, Bytes.toBytes("C")))); + + checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY4) + .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")) + .build(new RowMutations(ROWKEY4) + .add((Mutation) new Put(ROWKEY4) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) + .add((Mutation) new Delete(ROWKEY4).addColumns(FAMILY, Bytes.toBytes("D")))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + result = table.get(new Get(ROWKEY3)); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + assertNull(result.getValue(FAMILY, Bytes.toBytes("D"))); + + result = table.get(new Get(ROWKEY4)); + assertNull(result.getValue(FAMILY, Bytes.toBytes("F"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + } + } + + @Test + public void testCheckAndMutateBatch2() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")), + new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))); + + // Test for ifNotExists() + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifNotExists(FAMILY, Bytes.toBytes("B")) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifNotExists(FAMILY, Bytes.toBytes("B")) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for ifMatches() + checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a")) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); + + checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b")) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))); + assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"))); + assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); + + // Test for timeRange() + checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY3) + .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e"))); + + checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY4) + .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + result = table.get(new Get(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"))); + assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"))); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + } + } + + @Test + public void testCheckAndMutateBatchWithFilter() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY) + .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(ROWKEY2) + .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) + .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))); + + // Test for Put + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); + + boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + // Test for Delete + checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("C"))); + + checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new Delete(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + // Test for RowMutations + checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(ROWKEY) + .add((Mutation) new Put(ROWKEY) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) + .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))); + + checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .build(new RowMutations(ROWKEY2) + .add((Mutation) new Put(ROWKEY2) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g"))) + .add((Mutation) new Delete(ROWKEY2).addColumns(FAMILY, Bytes.toBytes("D")))); + + results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + result = table.get(new Get(ROWKEY)); + assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); + assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY2)); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + } + } + + @Test + public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable { + try (Table table = createTable()) { + table.put(Arrays.asList( + new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")) + .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b")) + .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")) + .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e")) + .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))); + + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, + Bytes.toBytes("a")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, + Bytes.toBytes("b")))) + .timeRange(TimeRange.between(0, 101)) + .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); + + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2) + .ifMatches(new FilterList( + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, + Bytes.toBytes("d")), + new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, + Bytes.toBytes("e")))) + .timeRange(TimeRange.between(0, 100)) + .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); + + boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2)); + + assertTrue(results[0]); + assertFalse(results[1]); + + Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); + + result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index fdf21f99fa3..ffe1fb7d174 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -484,6 +484,60 @@ public class TestFromClientSide3 { } } + @Test + public void testBatchWithCheckAndMutate() throws Exception { + try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { + byte[] row1 = Bytes.toBytes("row1"); + byte[] row2 = Bytes.toBytes("row2"); + byte[] row3 = Bytes.toBytes("row3"); + byte[] row4 = Bytes.toBytes("row4"); + byte[] row5 = Bytes.toBytes("row5"); + + table.put(Arrays.asList( + new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), + new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), + new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), + new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), + new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))); + + CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1) + .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) + .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g"))); + Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); + RowMutations mutations = new RowMutations(row3) + .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) + .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))); + CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4) + .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) + .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); + Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); + + List actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put); + Object[] results = new Object[actions.size()]; + table.batch(actions, results); + + assertTrue(((Result) results[0]).getExists()); + assertEquals("b", + Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B")))); + assertTrue(((Result) results[2]).getExists()); + assertFalse(((Result) results[3]).getExists()); + assertTrue(((Result) results[4]).isEmpty()); + + Result result = table.get(new Get(row1)); + assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); + + result = table.get(new Get(row3)); + assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); + + result = table.get(new Get(row4)); + assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); + + result = table.get(new Get(row5)); + assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); + } + } + @Test public void testHTableExistsMethodSingleRegionSingleGet() throws IOException, InterruptedException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java index 43d72e97184..c23bc7d7dca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -256,9 +256,8 @@ public class TestMalformedCellFromClient { actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } - ClientProtos.MultiRequest request = - ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) - .setCondition(condition).build(); + ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder() + .addRegionAction(builder.setCondition(condition).build()).build(); return request; } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java index 942e232afb2..9d940808eac 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -434,6 +435,16 @@ public class ThriftTable implements Table { throw new NotImplementedException("Implement later"); } + @Override + public boolean checkAndMutate(CheckAndMutate checkAndMutate) { + throw new NotImplementedException("Implement later"); + } + + @Override + public boolean[] checkAndMutate(List checkAndMutates) { + throw new NotImplementedException("Implement later"); + } + @Override public void mutateRow(RowMutations rm) throws IOException { TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);