HBASE-25242 Add Increment/Append support to RowMutations (#2711)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Toshihiro Suzuki 2020-11-27 03:53:19 +09:00 committed by GitHub
parent 1dbfe96d69
commit 3775464981
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1099 additions and 300 deletions

View File

@ -396,9 +396,9 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.
* @param mutation object that specifies the set of mutations to perform atomically
* @return A {@link CompletableFuture} that always returns null when complete normally.
* @return A {@link CompletableFuture} that returns results of Increment/Append operations
*/
CompletableFuture<Void> mutateRow(RowMutations mutation);
CompletableFuture<Result> mutateRow(RowMutations mutation);
/**
* The scan API uses the observer pattern.

View File

@ -217,7 +217,7 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
public CompletableFuture<Result> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));
}

View File

@ -17,18 +17,15 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Objects;
/**
* Used to perform CheckAndMutate operations.
@ -58,7 +55,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class CheckAndMutate extends Mutation {
public final class CheckAndMutate implements Row {
/**
* A builder class for building a CheckAndMutate object.
@ -200,15 +197,15 @@ public final class CheckAndMutate extends Mutation {
}
/**
* @param mutation mutations to perform if check succeeds
* @param mutations mutations to perform if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(RowMutations mutation) {
preCheck(mutation);
public CheckAndMutate build(RowMutations mutations) {
preCheck(mutations);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, mutation);
return new CheckAndMutate(row, filter, timeRange, mutations);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation);
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutations);
}
}
}
@ -223,6 +220,7 @@ public final class CheckAndMutate extends Mutation {
return new Builder(row);
}
private final byte[] row;
private final byte[] family;
private final byte[] qualifier;
private final CompareOperator op;
@ -233,7 +231,7 @@ public final class CheckAndMutate extends Mutation {
private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op,
byte[] value, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
this.row = row;
this.family = family;
this.qualifier = qualifier;
this.op = op;
@ -244,7 +242,7 @@ public final class CheckAndMutate extends Mutation {
}
private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) {
super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
this.row = row;
this.family = null;
this.qualifier = null;
this.op = null;
@ -254,6 +252,37 @@ public final class CheckAndMutate extends Mutation {
this.action = action;
}
/**
* @return the row
*/
@Override
public byte[] getRow() {
return row;
}
@Override
public int compareTo(Row row) {
return Bytes.compareTo(this.getRow(), row.getRow());
}
// Added to get rid of the stopbugs error
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Row other = (Row) obj;
return compareTo(other) == 0;
}
@Override
public int hashCode() {
return Bytes.hashCode(this.getRow());
}
/**
* @return the family to check
*/
@ -309,76 +338,4 @@ public final class CheckAndMutate extends Mutation {
public Row getAction() {
return action;
}
@Override
public NavigableMap<byte[], List<Cell>> getFamilyCellMap() {
if (action instanceof Mutation) {
return ((Mutation) action).getFamilyCellMap();
}
throw new UnsupportedOperationException();
}
@Override
public long getTimestamp() {
if (action instanceof Mutation) {
return ((Mutation) action).getTimestamp();
}
throw new UnsupportedOperationException();
}
@Override
public Mutation setTimestamp(long timestamp) {
if (action instanceof Mutation) {
return ((Mutation) action).setTimestamp(timestamp);
}
throw new UnsupportedOperationException();
}
@Override
public Durability getDurability() {
if (action instanceof Mutation) {
return ((Mutation) action).getDurability();
}
throw new UnsupportedOperationException();
}
@Override
public Mutation setDurability(Durability d) {
if (action instanceof Mutation) {
return ((Mutation) action).setDurability(d);
}
throw new UnsupportedOperationException();
}
@Override
public byte[] getAttribute(String name) {
if (action instanceof Mutation) {
return ((Mutation) action).getAttribute(name);
}
throw new UnsupportedOperationException();
}
@Override
public OperationWithAttributes setAttribute(String name, byte[] value) {
if (action instanceof Mutation) {
return ((Mutation) action).setAttribute(name, value);
}
throw new UnsupportedOperationException();
}
@Override
public int getPriority() {
if (action instanceof Mutation) {
return ((Mutation) action).getPriority();
}
return ((RowMutations) action).getMaxPriority();
}
@Override
public OperationWithAttributes setPriority(int priority) {
if (action instanceof Mutation) {
return ((Mutation) action).setPriority(priority);
}
throw new UnsupportedOperationException();
}
}

View File

@ -554,7 +554,7 @@ public class HTable implements Table {
}
@Override
public void mutateRow(final RowMutations rm) throws IOException {
public Result mutateRow(final RowMutations rm) throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
rpcControllerFactory.newController(), writeRpcTimeoutMs,
@ -578,6 +578,7 @@ public class HTable implements Table {
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
}
};
Object[] results = new Object[rm.getMutations().size()];
AsyncProcessTask task = AsyncProcessTask.newBuilder()
.setPool(pool)
.setTableName(tableName)
@ -586,12 +587,14 @@ public class HTable implements Table {
.setRpcTimeout(writeRpcTimeoutMs)
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.setResults(results)
.build();
AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
return (Result) results[0];
}
@Override

View File

@ -551,17 +551,17 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
.action((controller, loc, stub) ->
this.<Result, Void> mutateRow(controller, loc, stub, mutation,
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
writeRpcTimeoutNs).action((controller, loc, stub) ->
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
(rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter
.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
.build();
}, resp -> null))
}, resp -> resp))
.call();
}

View File

@ -30,7 +30,6 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
/**
* Performs multiple mutations atomically on a single row.
* Currently {@link Put} and {@link Delete} are supported.
*
* The mutations are performed in the order in which they
* were added.
@ -110,15 +109,13 @@ public class RowMutations implements Row {
}
/**
* Currently only supports {@link Put} and {@link Delete} mutations.
*
* @param mutations The data to send.
* @throws IOException if the row of added mutation doesn't match the original row
*/
public RowMutations add(List<? extends Mutation> mutations) throws IOException {
for (Mutation mutation : mutations) {
if (!Bytes.equals(row, mutation.getRow())) {
throw new WrongRowIOException("The row in the recently added Put/Delete <" +
throw new WrongRowIOException("The row in the recently added Mutation <" +
Bytes.toStringBinary(mutation.getRow()) + "> doesn't match the original one <" +
Bytes.toStringBinary(this.row) + ">");
}

View File

@ -644,9 +644,10 @@ public interface Table extends Closeable {
* {@link Put} and {@link Delete} are supported.
*
* @param rm object that specifies the set of mutations to perform atomically
* @throws IOException
* @return results of Increment/Append operations
* @throws IOException if a remote or network exception occurs.
*/
default void mutateRow(final RowMutations rm) throws IOException {
default Result mutateRow(final RowMutations rm) throws IOException {
throw new NotImplementedException("Add an implementation!");
}

View File

@ -3570,9 +3570,13 @@ public final class ProtobufUtil {
return builder.build((Put) m);
} else if (m instanceof Delete) {
return builder.build((Delete) m);
} else if (m instanceof Increment) {
return builder.build((Increment) m);
} else if (m instanceof Append) {
return builder.build((Append) m);
} else {
throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0)
.getClass().getSimpleName().toUpperCase());
throw new DoNotRetryIOException("Unsupported mutate type: " + m.getClass()
.getSimpleName().toUpperCase());
}
} else {
return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations));

View File

@ -271,17 +271,9 @@ public final class RequestConverter {
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
@ -387,17 +379,9 @@ public final class RequestConverter {
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
@ -928,17 +912,9 @@ public final class RequestConverter {
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else if (mutation instanceof Delete) {
type = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
mutationBuilder);
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
@ -946,7 +922,6 @@ public final class RequestConverter {
}
private static MutationType getMutationType(Mutation mutation) {
assert !(mutation instanceof CheckAndMutate);
if (mutation instanceof Put) {
return MutationType.PUT;
} else if (mutation instanceof Delete) {

View File

@ -148,8 +148,6 @@ public final class ResponseConverter {
actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion());
}
Object responseValue;
// For RowMutations/CheckAndMutate action, if there is an exception, the exception is set
// at the RegionActionResult level and the ResultOrException is null at the original index
Integer index = (indexMap == null ? null : indexMap.get(i));
@ -158,23 +156,10 @@ public final class ResponseConverter {
// If there is an exception from the server, the exception is set at
// the RegionActionResult level, which has been handled above.
if (actions.hasCondition()) {
Result result = null;
if (actionResult.getResultOrExceptionCount() > 0) {
ResultOrException roe = actionResult.getResultOrException(0);
if (roe.hasResult()) {
Result r = ProtobufUtil.toResult(roe.getResult(), cells);
if (!r.isEmpty()) {
result = r;
}
}
}
responseValue = new CheckAndMutateResult(actionResult.getProcessed(), result);
results.add(regionName, index, getCheckAndMutateResult(actionResult, cells));
} else {
responseValue = actionResult.getProcessed() ?
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
results.add(regionName, index, getMutateRowResult(actionResult, cells));
}
results.add(regionName, index, responseValue);
continue;
}
@ -185,11 +170,25 @@ public final class ResponseConverter {
if (!r.isEmpty()) {
result = r;
}
responseValue = new CheckAndMutateResult(actionResult.getProcessed(), result);
results.add(regionName, roe.getIndex(), responseValue);
results.add(regionName, roe.getIndex(),
new CheckAndMutateResult(actionResult.getProcessed(), result));
}
} else {
if (actionResult.hasProcessed()) {
for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
Result result = actionResult.getProcessed() ?
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE : ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
Result r = ProtobufUtil.toResult(roe.getResult(), cells);
if (!r.isEmpty()) {
r.setExists(true);
result = r;
}
results.add(regionName, roe.getIndex(), result);
}
continue;
}
for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
Object responseValue;
if (roe.hasException()) {
responseValue = ProtobufUtil.toException(roe.getException());
} else if (roe.hasResult()) {
@ -197,12 +196,7 @@ public final class ResponseConverter {
} else if (roe.hasServiceResult()) {
responseValue = roe.getServiceResult();
} else {
// Sometimes, the response is just "it was processed". Generally, this occurs for things
// like mutateRows where either we get back 'processed' (or not) and optionally some
// statistics about the regions we touched.
responseValue = actionResult.getProcessed() ?
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
responseValue = ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE;
}
results.add(regionName, roe.getIndex(), responseValue);
}
@ -219,6 +213,47 @@ public final class ResponseConverter {
return results;
}
private static CheckAndMutateResult getCheckAndMutateResult(RegionActionResult actionResult,
CellScanner cells) throws IOException {
Result result = null;
if (actionResult.getResultOrExceptionCount() > 0) {
// Get the result of the Increment/Append operations from the first element of the
// ResultOrException list
ResultOrException roe = actionResult.getResultOrException(0);
if (roe.hasResult()) {
Result r = ProtobufUtil.toResult(roe.getResult(), cells);
if (!r.isEmpty()) {
result = r;
}
}
}
return new CheckAndMutateResult(actionResult.getProcessed(), result);
}
private static Result getMutateRowResult(RegionActionResult actionResult, CellScanner cells)
throws IOException {
if (actionResult.getProcessed()) {
Result result = null;
if (actionResult.getResultOrExceptionCount() > 0) {
// Get the result of the Increment/Append operations from the first element of the
// ResultOrException list
ResultOrException roe = actionResult.getResultOrException(0);
Result r = ProtobufUtil.toResult(roe.getResult(), cells);
if (!r.isEmpty()) {
r.setExists(true);
result = r;
}
}
if (result != null) {
return result;
} else {
return ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE;
}
} else {
return ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
}
}
/**
* Create a CheckAndMutateResult object from a protocol buffer MutateResponse
*

View File

@ -875,7 +875,7 @@ public class RemoteHTable implements Table {
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
public Result mutateRow(RowMutations rm) throws IOException {
throw new IOException("atomicMutation not supported");
}

View File

@ -919,7 +919,7 @@ public interface RegionObserver {
/**
* Called after checkAndMutate
* <p>
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
* Note: Do not retain references to any Cells in actions beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param checkAndMutate the CheckAndMutate object

View File

@ -3454,8 +3454,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
this.checkAndPrepareMutation(mutation, timestamp);
if (mutation instanceof Put || mutation instanceof Delete) {
// store the family map reference to allow for mutations
familyCellMaps[index] = mutation.getFamilyCellMap();
}
// store durability for the batch (highest durability of all operations in the batch)
Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());
if (tmpDur.ordinal() > durability.ordinal()) {
@ -3846,33 +3849,51 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Bytes.toBytes(timestamp));
miniBatchOp.incrementNumOfDeletes();
} else if (mutation instanceof Increment || mutation instanceof Append) {
// For nonce operations
canProceed[index] = startNonceOperation(nonceGroup, nonce);
if (!canProceed[index]) {
// convert duplicate increment/append to get
List<Cell> results = region.get(toGet(mutation), false, nonceGroup, nonce);
retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS,
Result.create(results));
return true;
}
boolean returnResults;
if (mutation instanceof Increment) {
returnResults = ((Increment) mutation).isReturnResults();
miniBatchOp.incrementNumOfIncrements();
} else {
returnResults = ((Append) mutation).isReturnResults();
miniBatchOp.incrementNumOfAppends();
}
Result result = doCoprocessorPreCallAfterRowLock(mutation);
// For nonce operations
canProceed[index] = startNonceOperation(nonceGroup, nonce);
if (!canProceed[index]) {
Result result;
if (returnResults) {
// convert duplicate increment/append to get
List<Cell> results = region.get(toGet(mutation), false, nonceGroup, nonce);
result = Result.create(results);
} else {
result = Result.EMPTY_RESULT;
}
retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result);
return true;
}
Result result = null;
if (region.coprocessorHost != null) {
if (mutation instanceof Increment) {
result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
} else {
result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
}
}
if (result != null) {
retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS,
returnResults ? result : Result.EMPTY_RESULT);
return true;
}
List<Cell> results = returnResults ? new ArrayList<>(mutation.size()) : null;
familyCellMaps[index] = reckonDeltas(mutation, results, timestamp);
this.results[index] = results != null ? Result.create(results) : Result.EMPTY_RESULT;
if (mutation instanceof Increment) {
miniBatchOp.incrementNumOfIncrements();
} else {
miniBatchOp.incrementNumOfAppends();
}
}
region.rewriteCellTags(familyCellMaps[index], mutation);
@ -3954,28 +3975,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return get;
}
/**
* Do coprocessor pre-increment or pre-append after row lock call.
* @return Result returned out of the coprocessor, which means bypass all further processing
* and return the preferred Result instead, or null which means proceed.
*/
private Result doCoprocessorPreCallAfterRowLock(Mutation mutation) throws IOException {
assert mutation instanceof Increment || mutation instanceof Append;
Result result = null;
if (region.coprocessorHost != null) {
if (mutation instanceof Increment) {
result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
} else {
result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
}
}
return result;
}
private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results,
long now) throws IOException {
assert mutation instanceof Increment || mutation instanceof Append;
Map<byte[], List<Cell>> ret = new HashMap<>();
Map<byte[], List<Cell>> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
// Process a Store/family at a time.
for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
final byte[] columnFamilyName = entry.getKey();
@ -4018,14 +4021,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
// Sort the cells so that they match the order that they appear in the Get results.
// Otherwise, we won't be able to find the existing values if the cells are not specified
// in order by the client since cells are in an array list.
sort(deltas, store.getComparator());
// Get previous values for all columns in this family.
Get get = new Get(mutation.getRow());
for (Cell cell: deltas) {
get.addColumn(columnFamily, CellUtil.cloneQualifier(cell));
}
TimeRange tr;
if (mutation instanceof Increment) {
tr = ((Increment) mutation).getTimeRange();
} else {
tr = ((Append) mutation).getTimeRange();
}
List<Cell> currentValues = get(mutation, store, deltas, tr);
if (tr != null) {
get.setTimeRange(tr.getMin(), tr.getMax());
}
List<Cell> currentValues = region.get(get, false);
// Iterate the input columns and update existing values if they were found, otherwise
// add new column initialized to the delta amount
@ -4119,31 +4136,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return PrivateCellUtil.getValueAsLong(cell);
}
/**
* Do a specific Get on passed <code>columnFamily</code> and column qualifiers.
* @param mutation Mutation we are doing this Get for.
* @param store Which column family on row (TODO: Go all Gets in one go)
* @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.
* @return Return list of Cells found.
*/
private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates,
TimeRange tr) throws IOException {
// Sort the cells so that they match the order that they appear in the Get results.
// Otherwise, we won't be able to find the existing values if the cells are not specified
// in order by the client since cells are in an array list.
// TODO: I don't get why we are sorting. St.Ack 20150107
sort(coordinates, store.getComparator());
Get get = new Get(mutation.getRow());
for (Cell cell: coordinates) {
get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell));
}
// Increments carry time range. If an Increment instance, put it on the Get.
if (tr != null) {
get.setTimeRange(tr.getMin(), tr.getMax());
}
return region.get(get, false);
}
@Override
public List<Pair<NonceKey, WALEdit>> buildWALEdits(final MiniBatchOperationInProgress<Mutation>
miniBatchOp) throws IOException {
@ -4318,6 +4310,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
// TODO Support Increment/Append operations
private void checkAndMergeCPMutations(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
final List<RowLock> acquiredRowLocks, final long timestamp) throws IOException {
visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), (int i) -> {
@ -4527,7 +4520,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Perform a batch of mutations.
*
* It supports Put, Delete, Increment, Append mutations and will ignore other types passed.
* Operations in a batch are stored with highest durability specified of for all operations in a
* batch, except for {@link Durability#SKIP_WAL}.
*
@ -4885,11 +4877,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// timestamp from get (see prepareDeleteTimestamps).
}
// All edits for the given row (across all column families) must happen atomically.
Result r = null;
Result r;
if (mutation != null) {
r = doBatchMutate(mutation, true).getResult();
} else {
mutateRow(rowMutations);
r = mutateRow(rowMutations);
}
this.checkAndMutateChecksPassed.increment();
return new CheckAndMutateResult(true, r);
@ -8250,11 +8242,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
// Don't need nonces here - RowMutations only supports puts and deletes
public Result mutateRow(RowMutations rm) throws IOException {
final List<Mutation> m = rm.getMutations();
batchMutate(m.toArray(new Mutation[m.size()]), true, HConstants.NO_NONCE,
HConstants.NO_NONCE);
OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true,
HConstants.NO_NONCE, HConstants.NO_NONCE);
List<Result> results = new ArrayList<>();
for (OperationStatus status : statuses) {
if (status.getResult() != null) {
results.add(status.getResult());
}
}
if (results.isEmpty()) {
return null;
}
// Merge the results of the Increment/Append operations
List<Cell> cells = new ArrayList<>();
for (Result result : results) {
if (result.rawCells() != null) {
cells.addAll(Arrays.asList(result.rawCells()));
}
}
return Result.create(cells);
}
/**

View File

@ -124,6 +124,7 @@ public class MiniBatchOperationInProgress<T> {
* in the same batch. These mutations are applied to the WAL and applied to the memstore as well.
* The timestamp of the cells in the given Mutations MUST be obtained from the original mutation.
* <b>Note:</b> The durability from CP will be replaced by the durability of corresponding mutation.
* <b>Note:</b> Currently only supports Put and Delete operations.
* @param index the index that corresponds to the original mutation index in the batch
* @param newOperations the Mutations to add
*/

View File

@ -620,8 +620,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
mutations.add(del);
break;
case INCREMENT:
Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
++countOfCompleteMutation;
checkCellSizeLimit(region, increment);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
mutations.add(increment);
break;
case APPEND:
Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner);
++countOfCompleteMutation;
checkCellSizeLimit(region, append);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
mutations.add(append);
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
throw new DoNotRetryIOException("invalid mutation type : " + type);
}
}
@ -914,7 +928,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
/**
* Execute a list of Put/Delete mutations.
* Execute a list of mutations.
*
* @param builder
* @param region
@ -944,12 +958,27 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
MutationProto m = action.getMutation();
Mutation mutation;
if (m.getMutateType() == MutationType.PUT) {
switch (m.getMutateType()) {
case PUT:
mutation = ProtobufUtil.toPut(m, cells);
batchContainsPuts = true;
} else {
break;
case DELETE:
mutation = ProtobufUtil.toDelete(m, cells);
batchContainsDelete = true;
break;
case INCREMENT:
mutation = ProtobufUtil.toIncrement(m, cells);
break;
case APPEND:
mutation = ProtobufUtil.toAppend(m, cells);
break;
default:
throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
}
mutationActionMap.put(mutation, action);
mArray[i++] = mutation;
@ -972,11 +1001,51 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
HConstants.NO_NONCE);
// When atomic is true, it indicates that the mutateRow API or the batch API with
// RowMutations is called. In this case, we need to merge the results of the
// Increment/Append operations if the mutations include those operations, and set the merged
// result to the first element of the ResultOrException list
if (atomic) {
List<ResultOrException> resultOrExceptions = new ArrayList<>();
List<Result> results = new ArrayList<>();
for (i = 0; i < codes.length; i++) {
if (codes[i].getResult() != null) {
results.add(codes[i].getResult());
}
if (i != 0) {
resultOrExceptions.add(getResultOrException(
ClientProtos.Result.getDefaultInstance(), i));
}
}
if (results.isEmpty()) {
builder.addResultOrException(getResultOrException(
ClientProtos.Result.getDefaultInstance(), 0));
} else {
// Merge the results of the Increment/Append operations
List<Cell> cellList = new ArrayList<>();
for (Result result : results) {
if (result.rawCells() != null) {
cellList.addAll(Arrays.asList(result.rawCells()));
}
}
Result result = Result.create(cellList);
// Set the merged result of the Increment/Append operations to the first element of the
// ResultOrException list
builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
}
builder.addAllResultOrException(resultOrExceptions);
return;
}
for (i = 0; i < codes.length; i++) {
Mutation currentMutation = mArray[i];
ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i;
Exception e = null;
int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
Exception e;
switch (codes[i].getOperationStatusCode()) {
case BAD_FAMILY:
e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
@ -2738,6 +2807,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionActionResultBuilder.setProcessed(result.isSuccess());
for (int i = 0; i < regionAction.getActionCount(); i++) {
if (i == 0 && result.getResult() != null) {
// Set the result of the Increment/Append operations to the first element of the
// ResultOrException list
resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
.setResult(ProtobufUtil.toResult(result.getResult())).build());
@ -2761,7 +2832,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
cellScanner, spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(true);
// We no longer use MultiResponse#processed. Instead, we use
// RegionActionResult#condition. This is for backward compatibility for old clients.
// RegionActionResult#processed. This is for backward compatibility for old clients.
responseBuilder.setProcessed(true);
} catch (IOException e) {
rpcServer.getMetrics().exception(e);

View File

@ -290,8 +290,9 @@ public interface Region extends ConfigurationObserver {
/**
* Perform a batch of mutations.
* <p>
* Note this supports only Put, Delete, Increment and Append mutations and will ignore other
* types passed.
* Please do not operate on a same column of a single row in a batch, we will not consider the
* previous operation in the same batch when performing the operations in the batch.
*
* @param mutations the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
@ -528,13 +529,14 @@ public interface Region extends ConfigurationObserver {
Result increment(Increment increment) throws IOException;
/**
* Performs multiple mutations atomically on a single row. Currently
* {@link Put} and {@link Delete} are supported.
* Performs multiple mutations atomically on a single row.
*
* @param mutations object that specifies the set of mutations to perform atomically
* @return results of Increment/Append operations. If no Increment/Append operations, it returns
* null
* @throws IOException
*/
void mutateRow(RowMutations mutations) throws IOException;
Result mutateRow(RowMutations mutations) throws IOException;
/**
* Perform atomic mutations within the region.

View File

@ -250,18 +250,29 @@ public class TestAsyncTable {
public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
AsyncTable<?> table = getTable.get();
RowMutations mutation = new RowMutations(row);
mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
table.mutateRow(mutation).get();
Result result = table.get(new Get(row)).get();
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
Result result = table.mutateRow(mutation).get();
assertTrue(result.getExists());
assertTrue(result.isEmpty());
result = table.get(new Get(row)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
mutation = new RowMutations(row);
mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
table.mutateRow(mutation).get();
mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L));
mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc")));
result = table.mutateRow(mutation).get();
assertTrue(result.getExists());
assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
}
// Tests for old checkAndMutate API
@ -995,7 +1006,7 @@ public class TestAsyncTable {
public void testCheckAndIncrement() throws Throwable {
AsyncTable<?> table = getTable.get();
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
// CheckAndIncrement with correct value
CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
@ -1052,7 +1063,7 @@ public class TestAsyncTable {
public void testCheckAndAppend() throws Throwable {
AsyncTable<?> table = getTable.get();
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
// CheckAndAppend with correct value
CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
@ -1105,6 +1116,66 @@ public class TestAsyncTable {
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
@Test
public void testCheckAndRowMutations() throws Throwable {
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final String v1 = "v1";
AsyncTable<?> table = getTable.get();
// Initial values
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get();
// Do CheckAndRowMutations
CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, q1)
.build(new RowMutations(row).add(Arrays.asList(
new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
new Delete(row).addColumns(FAMILY, q2),
new Increment(row).addColumn(FAMILY, q3, 1),
new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
);
CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get();
assertTrue(result.isSuccess());
assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4)));
// Verify the value
Result r = table.get(new Get(row)).get();
assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
assertNull(r.getValue(FAMILY, q2));
assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
// Do CheckAndRowMutations again
checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, q1)
.build(new RowMutations(row).add(Arrays.asList(
new Delete(row).addColumns(FAMILY, q1),
new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
new Increment(row).addColumn(FAMILY, q3, 1),
new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
);
result = table.checkAndMutate(checkAndMutate).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
// Verify the value
r = table.get(new Get(row)).get();
assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
assertNull(r.getValue(FAMILY, q2));
assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
}
// Tests for batch version of checkAndMutate
@Test
@ -1512,6 +1583,65 @@ public class TestAsyncTable {
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testCheckAndRowMutationsBatch() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
.addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
.addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))
).get();
// CheckAndIncrement with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.build(new RowMutations(row).add(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")),
new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
)));
// CheckAndIncrement with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
.build(new RowMutations(row2).add(Arrays.asList(
new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")),
new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))
)));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertEquals(2, Bytes.toLong(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("dd", Bytes.toString(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row)).get();
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
result = table.get(new Get(row2)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H"))));
}
@Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();

View File

@ -358,11 +358,17 @@ public class TestAsyncTableBatch {
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g")));
.build(new RowMutations(row1)
.add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
.add((Mutation) new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
.add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
.add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
RowMutations mutations = new RowMutations(row3)
.add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
.add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")));
.add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
.add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
.add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
@ -378,16 +384,28 @@ public class TestAsyncTableBatch {
checkAndMutate3, checkAndMutate4);
List<Object> results = table.batchAll(actions).get();
assertTrue(((CheckAndMutateResult) results.get(0)).isSuccess());
assertNull(((CheckAndMutateResult) results.get(0)).getResult());
CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(0);
assertTrue(checkAndMutateResult.isSuccess());
assertEquals(3L,
Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("d",
Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
assertEquals("b",
Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B"))));
assertTrue(((Result) results.get(2)).getExists());
assertFalse(((CheckAndMutateResult) results.get(3)).isSuccess());
assertNull(((CheckAndMutateResult) results.get(3)).getResult());
Result result = (Result) results.get(2);
assertTrue(result.getExists());
assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
checkAndMutateResult = (CheckAndMutateResult) results.get(3);
assertFalse(checkAndMutateResult.isSuccess());
assertNull(checkAndMutateResult.getResult());
assertTrue(((Result) results.get(4)).isEmpty());
CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(5);
checkAndMutateResult = (CheckAndMutateResult) results.get(5);
assertTrue(checkAndMutateResult.isSuccess());
assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
.getValue(FAMILY, Bytes.toBytes("F"))));
@ -397,12 +415,18 @@ public class TestAsyncTableBatch {
assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
.getValue(FAMILY, Bytes.toBytes("G"))));
Result result = table.get(new Get(row1)).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row1)).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
result = table.get(new Get(row3)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row4)).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Optional;
@ -134,6 +135,19 @@ public class TestAsyncTableNoncedRetry {
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
}
@Test
public void testAppendWhenReturnResultsEqualsFalse() throws InterruptedException,
ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)
.setReturnResults(false)).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isEmpty());
}
@Test
public void testIncrement() throws InterruptedException, ExecutionException {
assertEquals(0, CALLED.get());
@ -143,4 +157,17 @@ public class TestAsyncTableNoncedRetry {
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
}
@Test
public void testIncrementWhenReturnResultsEqualsFalse() throws InterruptedException,
ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)
.setReturnResults(false)).get();
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isEmpty());
}
}

View File

@ -704,6 +704,66 @@ public class TestCheckAndMutate {
}
}
@Test
public void testCheckAndRowMutations() throws Throwable {
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final String v1 = "v1";
try (Table table = createTable()) {
// Initial values
table.put(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
new Put(ROWKEY).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
new Put(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("a"))));
// Do CheckAndRowMutations
CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(ROWKEY)
.ifNotExists(FAMILY, q1)
.build(new RowMutations(ROWKEY).add(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
new Delete(ROWKEY).addColumns(FAMILY, q2),
new Increment(ROWKEY).addColumn(FAMILY, q3, 1),
new Append(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
);
CheckAndMutateResult result = table.checkAndMutate(checkAndMutate);
assertTrue(result.isSuccess());
assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4)));
// Verify the value
Result r = table.get(new Get(ROWKEY));
assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
assertNull(r.getValue(FAMILY, q2));
assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
// Do CheckAndRowMutations again
checkAndMutate = CheckAndMutate.newBuilder(ROWKEY)
.ifNotExists(FAMILY, q1)
.build(new RowMutations(ROWKEY).add(Arrays.asList(
new Delete(ROWKEY).addColumns(FAMILY, q1),
new Put(ROWKEY).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
new Increment(ROWKEY).addColumn(FAMILY, q3, 1),
new Append(ROWKEY).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
);
result = table.checkAndMutate(checkAndMutate);
assertFalse(result.isSuccess());
assertNull(result.getResult());
// Verify the value
r = table.get(new Get(ROWKEY));
assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
assertNull(r.getValue(FAMILY, q2));
assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
}
}
// Tests for batch version of checkAndMutate
@Test
@ -1100,4 +1160,62 @@ public class TestCheckAndMutate {
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
}
@Test
public void testCheckAndRowMutationsBatch() throws Throwable {
try (Table table = createTable()) {
table.put(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
.addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
.addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))
);
// CheckAndIncrement with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.build(new RowMutations(ROWKEY).add(Arrays.asList(
new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("B")),
new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
)));
// CheckAndIncrement with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
.ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
.build(new RowMutations(ROWKEY2).add(Arrays.asList(
new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
new Delete(ROWKEY2).addColumns(FAMILY, Bytes.toBytes("F")),
new Increment(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
new Append(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))
)));
List<CheckAndMutateResult> results =
table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
assertTrue(results.get(0).isSuccess());
assertEquals(2, Bytes.toLong(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("dd", Bytes.toString(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(ROWKEY));
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
result = table.get(new Get(ROWKEY2));
assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H"))));
}
}
}

View File

@ -506,11 +506,17 @@ public class TestFromClientSide3 {
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g")));
.build(new RowMutations(row1)
.add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
.add((Mutation) new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
.add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
.add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
RowMutations mutations = new RowMutations(row3)
.add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
.add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")));
.add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
.add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
.add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
@ -527,16 +533,28 @@ public class TestFromClientSide3 {
Object[] results = new Object[actions.size()];
table.batch(actions, results);
assertTrue(((CheckAndMutateResult) results[0]).isSuccess());
assertNull(((CheckAndMutateResult) results[0]).getResult());
CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0];
assertTrue(checkAndMutateResult.isSuccess());
assertEquals(3L,
Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("d",
Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
assertEquals("b",
Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
assertTrue(((Result) results[2]).getExists());
assertFalse(((CheckAndMutateResult) results[3]).isSuccess());
assertNull(((CheckAndMutateResult) results[3]).getResult());
Result result = (Result) results[2];
assertTrue(result.getExists());
assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
checkAndMutateResult = (CheckAndMutateResult) results[3];
assertFalse(checkAndMutateResult.isSuccess());
assertNull(checkAndMutateResult.getResult());
assertTrue(((Result) results[4]).isEmpty());
CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[5];
checkAndMutateResult = (CheckAndMutateResult) results[5];
assertTrue(checkAndMutateResult.isSuccess());
assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
.getValue(FAMILY, Bytes.toBytes("F"))));
@ -546,12 +564,18 @@ public class TestFromClientSide3 {
assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
.getValue(FAMILY, Bytes.toBytes("G"))));
Result result = table.get(new Get(row1));
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row1));
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
result = table.get(new Get(row3));
assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row4));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));

View File

@ -293,21 +293,27 @@ public class TestFromClientSide5 extends FromClientSideBase {
}
@Test
public void testRowMutation() throws Exception {
LOG.info("Starting testRowMutation");
public void testRowMutations() throws Exception {
LOG.info("Starting testRowMutations");
final TableName tableName = name.getTableName();
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") };
byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"),
Bytes.toBytes("c"), Bytes.toBytes("d") };
// Test for Put operations
RowMutations arm = new RowMutations(ROW);
Put p = new Put(ROW);
p.addColumn(FAMILY, QUALIFIERS[0], VALUE);
arm.add(p);
t.mutateRow(arm);
Result r = t.mutateRow(arm);
assertTrue(r.getExists());
assertTrue(r.isEmpty());
Get g = new Get(ROW);
Result r = t.get(g);
r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
// Test for Put and Delete operations
arm = new RowMutations(ROW);
p = new Put(ROW);
p.addColumn(FAMILY, QUALIFIERS[1], VALUE);
@ -316,11 +322,34 @@ public class TestFromClientSide5 extends FromClientSideBase {
d.addColumns(FAMILY, QUALIFIERS[0]);
arm.add(d);
// TODO: Trying mutateRow again. The batch was failing with a one try only.
t.mutateRow(arm);
r = t.mutateRow(arm);
assertTrue(r.getExists());
assertTrue(r.isEmpty());
r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
// Test for Increment and Append operations
arm = new RowMutations(ROW);
arm.add(Arrays.asList(
new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE),
new Delete(ROW).addColumns(FAMILY, QUALIFIERS[1]),
new Increment(ROW).addColumn(FAMILY, QUALIFIERS[2], 5L),
new Append(ROW).addColumn(FAMILY, QUALIFIERS[3], Bytes.toBytes("abc"))
));
r = t.mutateRow(arm);
assertTrue(r.getExists());
assertEquals(5L, Bytes.toLong(r.getValue(FAMILY, QUALIFIERS[2])));
assertEquals("abc", Bytes.toString(r.getValue(FAMILY, QUALIFIERS[3])));
g = new Get(ROW);
r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
assertNull(r.getValue(FAMILY, QUALIFIERS[1]));
assertEquals(5L, Bytes.toLong(r.getValue(FAMILY, QUALIFIERS[2])));
assertEquals("abc", Bytes.toString(r.getValue(FAMILY, QUALIFIERS[3])));
// Test that we get a region level exception
try {
arm = new RowMutations(ROW);

View File

@ -777,14 +777,26 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPreBatchMutate.get() > 0;
}
public int getPreBatchMutate() {
return ctPreBatchMutate.get();
}
public boolean hadPostBatchMutate() {
return ctPostBatchMutate.get() > 0;
}
public int getPostBatchMutate() {
return ctPostBatchMutate.get();
}
public boolean hadPostBatchMutateIndispensably() {
return ctPostBatchMutateIndispensably.get() > 0;
}
public int getPostBatchMutateIndispensably() {
return ctPostBatchMutateIndispensably.get();
}
public boolean hadPostStartRegionOperation() {
return ctPostStartRegionOperation.get() > 0;
}

View File

@ -243,14 +243,16 @@ public class TestRegionObserverInterface {
inc.addColumn(A, A, 1);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" },
tableName, new Boolean[] { false, false, false });
new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock",
"hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
tableName, new Boolean[] { false, false, false, false, false, false });
table.increment(inc);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock" },
tableName, new Boolean[] { true, true, true });
new String[] { "hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock",
"hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
tableName, new Boolean[] { true, true, true, true, true, true });
} finally {
util.deleteTable(tableName);
table.close();
@ -339,7 +341,75 @@ public class TestRegionObserverInterface {
}
@Test
public void testCheckAndMutateWithRowMutationsHooks() throws Exception {
public void testCheckAndIncrementHooks() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." +
name.getMethodName());
Table table = util.createTable(tableName, new byte[][] { A, B, C });
try {
byte[] row = Bytes.toBytes(0);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 0, 0, 0 });
table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(A, A)
.build(new Increment(row).addColumn(A, A, 1)));
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 1, 1, 1 });
table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(A, A, Bytes.toBytes(1L))
.build(new Increment(row).addColumn(A, A, 1)));
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 2, 2, 2 });
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
public void testCheckAndAppendHooks() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." +
name.getMethodName());
Table table = util.createTable(tableName, new byte[][] { A, B, C });
try {
byte[] row = Bytes.toBytes(0);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 0, 0, 0 });
table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(A, A)
.build(new Append(row).addColumn(A, A, A)));
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 1, 1, 1 });
table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(A, A, A)
.build(new Append(row).addColumn(A, A, A)));
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 2, 2, 2 });
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
public void testCheckAndRowMutationsHooks() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." +
name.getMethodName());
Table table = util.createTable(tableName, new byte[][] { A, B, C });
@ -388,14 +458,18 @@ public class TestRegionObserverInterface {
app.addColumn(A, A, A);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName,
new Boolean[] { false, false, false });
new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock",
"hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
tableName,
new Boolean[] { false, false, false, false, false, false });
table.append(app);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock" }, tableName,
new Boolean[] { true, true, true });
new String[] { "hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock",
"hadPreBatchMutate", "hadPostBatchMutate", "hadPostBatchMutateIndispensably" },
tableName,
new Boolean[] { true, true, true, true, true, true });
} finally {
util.deleteTable(tableName);
table.close();

View File

@ -280,7 +280,7 @@ public class RegionAsTable implements Table {
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
public Result mutateRow(RowMutations rm) throws IOException {
throw new UnsupportedOperationException();
}

View File

@ -61,6 +61,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
@ -2390,6 +2391,7 @@ public class TestHRegion {
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// Putting data in key
put = new Put(row1);
@ -2399,17 +2401,20 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// not empty anymore
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
Delete delete = new Delete(row1);
delete.addColumn(fam1, qf1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
assertFalse(res.isSuccess());
assertNull(res.getResult());
put = new Put(row1);
put.addColumn(fam1, qf1, val2);
@ -2417,6 +2422,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// checkAndDelete with correct value
delete = new Delete(row1);
@ -2425,11 +2431,13 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
assertTrue(res.isSuccess());
assertNull(res.getResult());
delete = new Delete(row1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// checkAndPut looking for a null value
put = new Put(row1);
@ -2438,6 +2446,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1)
.build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
}
@Test
@ -2461,6 +2470,7 @@ public class TestHRegion {
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// checkAndDelete with wrong value
Delete delete = new Delete(row1);
@ -2468,6 +2478,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// Putting data in key
put = new Put(row1);
@ -2478,6 +2489,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// checkAndDelete with wrong value
delete = new Delete(row1);
@ -2485,6 +2497,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(delete));
assertFalse(res.isSuccess());
assertNull(res.getResult());
}
@Test
@ -2514,6 +2527,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
assertTrue("Delete", res.isSuccess());
assertNull(res.getResult());
// Putting data in key
put = new Put(row1);
@ -2524,6 +2538,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(put));
assertTrue("Second put", res.isSuccess());
assertNull(res.getResult());
// checkAndDelete with correct value
delete = new Delete(row1, now + 3);
@ -2531,6 +2546,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(delete));
assertTrue("Second delete", res.isSuccess());
assertNull(res.getResult());
}
@Test
@ -2554,11 +2570,13 @@ public class TestHRegion {
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.LESS: original = val3, compare with val4, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.LESS: original = val3, compare with val2,
// succeed (now value = val2)
@ -2567,17 +2585,20 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
// succeed (value still = val2)
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
// succeed (now value = val3)
@ -2586,16 +2607,19 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.GREATER: original = val3, compare with val3, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.GREATER: original = val3, compare with val2, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.GREATER: original = val3, compare with val4,
// succeed (now value = val2)
@ -2604,22 +2628,26 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val1).build(put));
assertFalse(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
// succeed (value still = val2)
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val2).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val3).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
}
@Test
@ -2650,6 +2678,7 @@ public class TestHRegion {
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
assertTrue(res.isSuccess());
assertNull(res.getResult());
Get get = new Get(row1);
get.addColumn(fam2, qf1);
@ -2704,6 +2733,7 @@ public class TestHRegion {
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
assertTrue(res.isSuccess());
assertNull(res.getResult());
Get get = new Get(row1);
get.addColumn(fam1, qf1);
@ -2720,6 +2750,7 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam2, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
assertTrue(res.isSuccess());
assertNull(res.getResult());
get = new Get(row1);
r = region.get(get);
@ -2731,6 +2762,8 @@ public class TestHRegion {
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
assertTrue(res.isSuccess());
assertNull(res.getResult());
get = new Get(row1);
r = region.get(get);
assertEquals(0, r.size());
@ -2759,6 +2792,7 @@ public class TestHRegion {
Bytes.toBytes("b"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
assertTrue(res.isSuccess());
assertNull(res.getResult());
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
@ -2772,6 +2806,7 @@ public class TestHRegion {
Bytes.toBytes("c"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
assertFalse(res.isSuccess());
assertNull(res.getResult());
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
@ -2784,6 +2819,7 @@ public class TestHRegion {
Bytes.toBytes("b"))))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))));
assertTrue(res.isSuccess());
assertNull(res.getResult());
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
@ -2799,6 +2835,7 @@ public class TestHRegion {
.addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
assertTrue(res.isSuccess());
assertNull(res.getResult());
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
@ -2823,6 +2860,7 @@ public class TestHRegion {
.timeRange(TimeRange.between(0, 101))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertTrue(res.isSuccess());
assertNull(res.getResult());
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
@ -2834,10 +2872,11 @@ public class TestHRegion {
.timeRange(TimeRange.between(0, 100))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
assertFalse(res.isSuccess());
assertNull(res.getResult());
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
// Mutate with success
// RowMutations with success
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")))
@ -2847,6 +2886,7 @@ public class TestHRegion {
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
assertTrue(res.isSuccess());
assertNull(res.getResult());
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
@ -2975,6 +3015,126 @@ public class TestHRegion {
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
@Test
public void testCheckAndIncrementAndAppend() throws Throwable {
// Setting up region
this.region = initHRegion(tableName, method, CONF, fam1);
// CheckAndMutate with Increment and Append
CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(fam1, qual)
.build(new RowMutations(row)
.add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L))
.add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a")))
);
CheckAndMutateResult result = region.checkAndMutate(checkAndMutate);
assertTrue(result.isSuccess());
assertEquals(1L, Bytes.toLong(result.getResult().getValue(fam1, qual1)));
assertEquals("a", Bytes.toString(result.getResult().getValue(fam1, qual2)));
Result r = region.get(new Get(row));
assertEquals(1L, Bytes.toLong(r.getValue(fam1, qual1)));
assertEquals("a", Bytes.toString(r.getValue(fam1, qual2)));
// Set return results to false
checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(fam1, qual)
.build(new RowMutations(row)
.add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L).setReturnResults(false))
.add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a"))
.setReturnResults(false))
);
result = region.checkAndMutate(checkAndMutate);
assertTrue(result.isSuccess());
assertNull(result.getResult().getValue(fam1, qual1));
assertNull(result.getResult().getValue(fam1, qual2));
r = region.get(new Get(row));
assertEquals(2L, Bytes.toLong(r.getValue(fam1, qual1)));
assertEquals("aa", Bytes.toString(r.getValue(fam1, qual2)));
checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(fam1, qual)
.build(new RowMutations(row)
.add((Mutation) new Increment(row).addColumn(fam1, qual1, 1L))
.add((Mutation) new Append(row).addColumn(fam1, qual2, Bytes.toBytes("a"))
.setReturnResults(false))
);
result = region.checkAndMutate(checkAndMutate);
assertTrue(result.isSuccess());
assertEquals(3L, Bytes.toLong(result.getResult().getValue(fam1, qual1)));
assertNull(result.getResult().getValue(fam1, qual2));
r = region.get(new Get(row));
assertEquals(3L, Bytes.toLong(r.getValue(fam1, qual1)));
assertEquals("aaa", Bytes.toString(r.getValue(fam1, qual2)));
}
@Test
public void testCheckAndRowMutations() throws Throwable {
final byte[] row = Bytes.toBytes("row");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final String v1 = "v1";
region = initHRegion(tableName, method, CONF, fam1);
// Initial values
region.batchMutate(new Mutation[] {
new Put(row).addColumn(fam1, q2, Bytes.toBytes("toBeDeleted")),
new Put(row).addColumn(fam1, q3, Bytes.toBytes(5L)),
new Put(row).addColumn(fam1, q4, Bytes.toBytes("a")),
});
// Do CheckAndRowMutations
CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(fam1, q1)
.build(new RowMutations(row).add(Arrays.asList(
new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)),
new Delete(row).addColumns(fam1, q2),
new Increment(row).addColumn(fam1, q3, 1),
new Append(row).addColumn(fam1, q4, Bytes.toBytes("b"))))
);
CheckAndMutateResult result = region.checkAndMutate(checkAndMutate);
assertTrue(result.isSuccess());
assertEquals(6L, Bytes.toLong(result.getResult().getValue(fam1, q3)));
assertEquals("ab", Bytes.toString(result.getResult().getValue(fam1, q4)));
// Verify the value
Result r = region.get(new Get(row));
assertEquals(v1, Bytes.toString(r.getValue(fam1, q1)));
assertNull(r.getValue(fam1, q2));
assertEquals(6L, Bytes.toLong(r.getValue(fam1, q3)));
assertEquals("ab", Bytes.toString(r.getValue(fam1, q4)));
// Do CheckAndRowMutations again
checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(fam1, q1)
.build(new RowMutations(row).add(Arrays.asList(
new Delete(row).addColumns(fam1, q1),
new Put(row).addColumn(fam1, q2, Bytes.toBytes(v1)),
new Increment(row).addColumn(fam1, q3, 1),
new Append(row).addColumn(fam1, q4, Bytes.toBytes("b"))))
);
result = region.checkAndMutate(checkAndMutate);
assertFalse(result.isSuccess());
assertNull(result.getResult());
// Verify the value
r = region.get(new Get(row));
assertEquals(v1, Bytes.toString(r.getValue(fam1, q1)));
assertNull(r.getValue(fam1, q2));
assertEquals(6L, Bytes.toLong(r.getValue(fam1, q3)));
assertEquals("ab", Bytes.toString(r.getValue(fam1, q4)));
}
// ////////////////////////////////////////////////////////////////////////////
// Delete tests
// ////////////////////////////////////////////////////////////////////////////
@ -7232,6 +7392,149 @@ public class TestHRegion {
}
}
@Test
public void testMutateRow() throws Exception {
final byte[] row = Bytes.toBytes("row");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final String v1 = "v1";
region = initHRegion(tableName, method, CONF, fam1);
// Initial values
region.batchMutate(new Mutation[] {
new Put(row).addColumn(fam1, q2, Bytes.toBytes("toBeDeleted")),
new Put(row).addColumn(fam1, q3, Bytes.toBytes(5L)),
new Put(row).addColumn(fam1, q4, Bytes.toBytes("a")),
});
// Do mutateRow
Result result = region.mutateRow(new RowMutations(row).add(Arrays.asList(
new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1)),
new Delete(row).addColumns(fam1, q2),
new Increment(row).addColumn(fam1, q3, 1),
new Append(row).addColumn(fam1, q4, Bytes.toBytes("b")))));
assertNotNull(result);
assertEquals(6L, Bytes.toLong(result.getValue(fam1, q3)));
assertEquals("ab", Bytes.toString(result.getValue(fam1, q4)));
// Verify the value
result = region.get(new Get(row));
assertEquals(v1, Bytes.toString(result.getValue(fam1, q1)));
assertNull(result.getValue(fam1, q2));
assertEquals(6L, Bytes.toLong(result.getValue(fam1, q3)));
assertEquals("ab", Bytes.toString(result.getValue(fam1, q4)));
}
@Test
public void testMutateRowInParallel() throws Exception {
final int numReaderThreads = 100;
final CountDownLatch latch = new CountDownLatch(numReaderThreads);
final byte[] row = Bytes.toBytes("row");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final String v1 = "v1";
final String v2 = "v2";
// We need to ensure the timestamp of the delete operation is more than the previous one
final AtomicLong deleteTimestamp = new AtomicLong();
region = initHRegion(tableName, method, CONF, fam1);
// Initial values
region.batchMutate(new Mutation[] {
new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1))
.addColumn(fam1, q2, deleteTimestamp.getAndIncrement(), Bytes.toBytes(v2))
.addColumn(fam1, q3, Bytes.toBytes(1L))
.addColumn(fam1, q4, Bytes.toBytes("a"))
});
final AtomicReference<AssertionError> assertionError = new AtomicReference<>();
// Writer thread
Thread writerThread = new Thread(() -> {
try {
while (true) {
// If all the reader threads finish, then stop the writer thread
if (latch.await(0, TimeUnit.MILLISECONDS)) {
return;
}
// Execute the mutations. This should be done atomically
region.mutateRow(new RowMutations(row).add(Arrays.asList(
new Put(row).addColumn(fam1, q1, Bytes.toBytes(v2)),
new Delete(row).addColumns(fam1, q2, deleteTimestamp.getAndIncrement()),
new Increment(row).addColumn(fam1, q3, 1L),
new Append(row).addColumn(fam1, q4, Bytes.toBytes("b")))));
// We need to ensure the timestamps of the Increment/Append operations are more than the
// previous ones
Result result = region.get(new Get(row).addColumn(fam1, q3).addColumn(fam1, q4));
long tsIncrement = result.getColumnLatestCell(fam1, q3).getTimestamp();
long tsAppend = result.getColumnLatestCell(fam1, q4).getTimestamp();
// Put the initial values
region.batchMutate(new Mutation[] {
new Put(row).addColumn(fam1, q1, Bytes.toBytes(v1))
.addColumn(fam1, q2, deleteTimestamp.getAndIncrement(), Bytes.toBytes(v2))
.addColumn(fam1, q3, tsIncrement + 1, Bytes.toBytes(1L))
.addColumn(fam1, q4, tsAppend + 1, Bytes.toBytes("a"))
});
}
} catch (Exception e) {
e.printStackTrace();
assertionError.set(new AssertionError(e.getMessage()));
}
});
writerThread.start();
// Reader threads
for (int i = 0; i < numReaderThreads; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 10000; j++) {
// Verify the values
Result result = region.get(new Get(row));
// The values should be equals to either the initial values or the values after
// executing the mutations
String q1Value = Bytes.toString(result.getValue(fam1, q1));
if (v1.equals(q1Value)) {
assertEquals(v2, Bytes.toString(result.getValue(fam1, q2)));
assertEquals(1L, Bytes.toLong(result.getValue(fam1, q3)));
assertEquals("a", Bytes.toString(result.getValue(fam1, q4)));
} else if (v2.equals(q1Value)) {
assertNull(Bytes.toString(result.getValue(fam1, q2)));
assertEquals(2L, Bytes.toLong(result.getValue(fam1, q3)));
assertEquals("ab", Bytes.toString(result.getValue(fam1, q4)));
} else {
fail("the qualifier " + q1 + " should be " + v1 + " or " + v2 + ", but " + q1Value);
}
}
} catch (Exception e) {
e.printStackTrace();
assertionError.set(new AssertionError(e.getMessage()));
} catch (AssertionError e) {
assertionError.set(e);
}
latch.countDown();
}).start();
}
writerThread.join();
if (assertionError.get() != null) {
throw assertionError.get();
}
}
@Test
public void testMutateRow_WriteRequestCount() throws Exception {
byte[] row1 = Bytes.toBytes("row1");

View File

@ -415,8 +415,8 @@ public class TestWALEntrySinkFilter {
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
public Result mutateRow(RowMutations rm) throws IOException {
return null;
}
@Override

View File

@ -447,10 +447,11 @@ public class ThriftTable implements Table {
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
public Result mutateRow(RowMutations rm) throws IOException {
TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
try {
client.mutateRow(tableNameInBytes, tRowMutations);
return Result.EMPTY_RESULT;
} catch (TException e) {
throw new IOException(e);
}