HBASE-24680 Refactor the checkAndMutate code on the server side (#2094)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
Toshihiro Suzuki 2020-08-01 21:02:17 +09:00 committed by GitHub
parent a3f623eea7
commit e22a2d2700
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1214 additions and 619 deletions

View File

@ -216,7 +216,7 @@ public final class CheckAndMutate extends Mutation {
this.op = op;
this.value = value;
this.filter = null;
this.timeRange = timeRange;
this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
this.action = action;
}
@ -227,7 +227,7 @@ public final class CheckAndMutate extends Mutation {
this.op = null;
this.value = null;
this.filter = filter;
this.timeRange = timeRange;
this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
this.action = action;
}
@ -266,6 +266,13 @@ public final class CheckAndMutate extends Mutation {
return filter;
}
/**
* @return whether this has a filter or not
*/
public boolean hasFilter() {
return filter != null;
}
/**
* @return the time range to check
*/

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ExtendedCellBuilder;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@ -84,6 +86,7 @@ import org.apache.hadoop.hbase.client.RegionLoadStats;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.client.SnapshotDescription;
@ -3543,4 +3546,73 @@ public final class ProtobufUtil {
return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName()).addAllServers(hostports)
.addAllTables(tables).addAllConfiguration(configuration).build();
}
public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
MutationProto mutation, CellScanner cellScanner) throws IOException {
byte[] row = condition.getRow().toByteArray();
CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row);
Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
if (filter != null) {
builder.ifMatches(filter);
} else {
builder.ifMatches(condition.getFamily().toByteArray(),
condition.getQualifier().toByteArray(),
CompareOperator.valueOf(condition.getCompareType().name()),
ProtobufUtil.toComparator(condition.getComparator()).getValue());
}
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
builder.timeRange(timeRange);
try {
MutationType type = mutation.getMutateType();
switch (type) {
case PUT:
return builder.build(ProtobufUtil.toPut(mutation, cellScanner));
case DELETE:
return builder.build(ProtobufUtil.toDelete(mutation, cellScanner));
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
} catch (IllegalArgumentException e) {
throw new DoNotRetryIOException(e.getMessage());
}
}
public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
List<Mutation> mutations) throws IOException {
assert mutations.size() > 0;
byte[] row = condition.getRow().toByteArray();
CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row);
Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
if (filter != null) {
builder.ifMatches(filter);
} else {
builder.ifMatches(condition.getFamily().toByteArray(),
condition.getQualifier().toByteArray(),
CompareOperator.valueOf(condition.getCompareType().name()),
ProtobufUtil.toComparator(condition.getComparator()).getValue());
}
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
builder.timeRange(timeRange);
try {
if (mutations.size() == 1) {
Mutation m = mutations.get(0);
if (m instanceof Put) {
return builder.build((Put) m);
} else if (m instanceof Delete) {
return builder.build((Delete) m);
} else {
throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0)
.getClass().getSimpleName().toUpperCase());
}
} else {
return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations));
}
} catch (IllegalArgumentException e) {
throw new DoNotRetryIOException(e.getMessage());
}
}
}

View File

@ -86,6 +86,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
*/
void updateCheckAndPut(long t);
/**
* Update checkAndMutate histogram
* @param t time it took
*/
void updateCheckAndMutate(long t);
/**
* Update the Get time histogram .
*
@ -399,6 +405,7 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String DELETE_KEY = "delete";
String CHECK_AND_DELETE_KEY = "checkAndDelete";
String CHECK_AND_PUT_KEY = "checkAndPut";
String CHECK_AND_MUTATE_KEY = "checkAndMutate";
String DELETE_BATCH_KEY = "deleteBatch";
String GET_SIZE_KEY = "getSize";
String GET_KEY = "get";

View File

@ -42,6 +42,7 @@ public class MetricsRegionServerSourceImpl
private final MetricHistogram deleteBatchHisto;
private final MetricHistogram checkAndDeleteHisto;
private final MetricHistogram checkAndPutHisto;
private final MetricHistogram checkAndMutateHisto;
private final MetricHistogram getHisto;
private final MetricHistogram incrementHisto;
private final MetricHistogram appendHisto;
@ -112,6 +113,7 @@ public class MetricsRegionServerSourceImpl
deleteBatchHisto = getMetricsRegistry().newTimeHistogram(DELETE_BATCH_KEY);
checkAndDeleteHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_DELETE_KEY);
checkAndPutHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_PUT_KEY);
checkAndMutateHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_MUTATE_KEY);
getHisto = getMetricsRegistry().newTimeHistogram(GET_KEY);
slowGet = getMetricsRegistry().newCounter(SLOW_GET_KEY, SLOW_GET_DESC, 0L);
@ -618,6 +620,11 @@ public class MetricsRegionServerSourceImpl
checkAndPutHisto.add(t);
}
@Override
public void updateCheckAndMutate(long t) {
checkAndMutateHisto.add(t);
}
@Override
public void updatePutBatch(long t) {
putBatchHisto.add(t);

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@ -514,7 +517,11 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
@Deprecated
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
boolean result) throws IOException {
@ -535,7 +542,11 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
@Deprecated
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Put put, boolean result) throws IOException {
return result;
@ -562,7 +573,12 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
* instead.
*/
@Deprecated
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, Put put, boolean result) throws IOException {
@ -587,7 +603,12 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
* instead.
*/
@Deprecated
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, Filter filter, Put put, boolean result) throws IOException {
return result;
@ -607,7 +628,11 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result from the checkAndPut
* @return the possibly transformed return value to return to client
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
@Deprecated
default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
boolean result) throws IOException {
@ -625,7 +650,11 @@ public interface RegionObserver {
* @param put data to put if check succeeds
* @param result from the checkAndPut
* @return the possibly transformed return value to return to client
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
@Deprecated
default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Put put, boolean result) throws IOException {
return result;
@ -648,7 +677,11 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
@Deprecated
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
Delete delete, boolean result) throws IOException {
@ -669,7 +702,11 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
@Deprecated
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Delete delete, boolean result) throws IOException {
return result;
@ -696,7 +733,12 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
* instead.
*/
@Deprecated
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
@ -721,7 +763,12 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
* instead.
*/
@Deprecated
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
return result;
@ -741,7 +788,11 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete
* @return the possibly transformed returned value to return to client
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
@Deprecated
default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
Delete delete, boolean result) throws IOException {
@ -759,12 +810,150 @@ public interface RegionObserver {
* @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete
* @return the possibly transformed returned value to return to client
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
*/
@Deprecated
default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
Filter filter, Delete delete, boolean result) throws IOException {
return result;
}
/**
* Called before checkAndMutate
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in actions beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param checkAndMutate the CheckAndMutate object
* @param result the default value of the result
* @return the return value to return to client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
default CheckAndMutateResult preCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
if (checkAndMutate.getAction() instanceof Put) {
boolean success;
if (checkAndMutate.hasFilter()) {
success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFilter(),
(Put) checkAndMutate.getAction(), result.isSuccess());
} else {
success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(),
new BinaryComparator(checkAndMutate.getValue()), (Put) checkAndMutate.getAction(),
result.isSuccess());
}
return new CheckAndMutateResult(success, null);
} else if (checkAndMutate.getAction() instanceof Delete) {
boolean success;
if (checkAndMutate.hasFilter()) {
success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFilter(),
(Delete) checkAndMutate.getAction(), result.isSuccess());
} else {
success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(),
new BinaryComparator(checkAndMutate.getValue()), (Delete) checkAndMutate.getAction(),
result.isSuccess());
}
return new CheckAndMutateResult(success, null);
}
return result;
}
/**
* Called before checkAndDelete but after acquiring rowlock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
* can lead to potential deadlock.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
* <p>
* Note: Do not retain references to any Cells in actions beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param checkAndMutate the CheckAndMutate object
* @param result the default value of the result
* @return the value to return to client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
default CheckAndMutateResult preCheckAndMutateAfterRowLock(
ObserverContext<RegionCoprocessorEnvironment> c, CheckAndMutate checkAndMutate,
CheckAndMutateResult result) throws IOException {
if (checkAndMutate.getAction() instanceof Put) {
boolean success;
if (checkAndMutate.hasFilter()) {
success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(),
checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess());
} else {
success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
(Put) checkAndMutate.getAction(), result.isSuccess());
}
return new CheckAndMutateResult(success, null);
} else if (checkAndMutate.getAction() instanceof Delete) {
boolean success;
if (checkAndMutate.hasFilter()) {
success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(),
checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess());
} else {
success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
(Delete) checkAndMutate.getAction(), result.isSuccess());
}
return new CheckAndMutateResult(success, null);
}
return result;
}
/**
* Called after checkAndMutate
* <p>
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param checkAndMutate the CheckAndMutate object
* @param result from the checkAndMutate
* @return the possibly transformed returned value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
default CheckAndMutateResult postCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
if (checkAndMutate.getAction() instanceof Put) {
boolean success;
if (checkAndMutate.hasFilter()) {
success = postCheckAndPut(c, checkAndMutate.getRow(),
checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess());
} else {
success = postCheckAndPut(c, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
(Put) checkAndMutate.getAction(), result.isSuccess());
}
return new CheckAndMutateResult(success, null);
} else if (checkAndMutate.getAction() instanceof Delete) {
boolean success;
if (checkAndMutate.hasFilter()) {
success = postCheckAndDelete(c, checkAndMutate.getRow(),
checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess());
} else {
success = postCheckAndDelete(c, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
(Delete) checkAndMutate.getAction(), result.isSuccess());
}
return new CheckAndMutateResult(success, null);
}
return result;
}
/**
* Called before Append.
* <p>

View File

@ -102,6 +102,8 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Delete;
@ -127,6 +129,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterWrapper;
@ -4278,43 +4281,103 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
@Deprecated
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null,
mutation);
CheckAndMutate checkAndMutate;
try {
CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row)
.ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange);
if (mutation instanceof Put) {
checkAndMutate = builder.build((Put) mutation);
} else if (mutation instanceof Delete) {
checkAndMutate = builder.build((Delete) mutation);
} else {
throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass()
.getSimpleName().toUpperCase());
}
} catch (IllegalArgumentException e) {
throw new DoNotRetryIOException(e.getMessage());
}
return checkAndMutate(checkAndMutate).isSuccess();
}
@Override
@Deprecated
public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation)
throws IOException {
return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation);
CheckAndMutate checkAndMutate;
try {
CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row).ifMatches(filter)
.timeRange(timeRange);
if (mutation instanceof Put) {
checkAndMutate = builder.build((Put) mutation);
} else if (mutation instanceof Delete) {
checkAndMutate = builder.build((Delete) mutation);
} else {
throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass()
.getSimpleName().toUpperCase());
}
} catch (IllegalArgumentException e) {
throw new DoNotRetryIOException(e.getMessage());
}
return checkAndMutate(checkAndMutate).isSuccess();
}
@Override
@Deprecated
public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null);
CheckAndMutate checkAndMutate;
try {
checkAndMutate = CheckAndMutate.newBuilder(row)
.ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange).build(rm);
} catch (IllegalArgumentException e) {
throw new DoNotRetryIOException(e.getMessage());
}
return checkAndMutate(checkAndMutate).isSuccess();
}
@Override
@Deprecated
public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm)
throws IOException {
return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null);
CheckAndMutate checkAndMutate;
try {
checkAndMutate = CheckAndMutate.newBuilder(row).ifMatches(filter).timeRange(timeRange)
.build(rm);
} catch (IllegalArgumentException e) {
throw new DoNotRetryIOException(e.getMessage());
}
return checkAndMutate(checkAndMutate).isSuccess();
}
/**
* checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
* switches in the few places where there is deviation.
*/
private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
RowMutations rowMutations, Mutation mutation)
throws IOException {
// Could do the below checks but seems wacky with two callers only. Just comment out for now.
// One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
// need these commented out checks.
// if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null");
// if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set");
@Override
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
byte[] row = checkAndMutate.getRow();
Filter filter = null;
byte[] family = null;
byte[] qualifier = null;
CompareOperator op = null;
ByteArrayComparable comparator = null;
if (checkAndMutate.hasFilter()) {
filter = checkAndMutate.getFilter();
} else {
family = checkAndMutate.getFamily();
qualifier = checkAndMutate.getQualifier();
op = checkAndMutate.getCompareOp();
comparator = new BinaryComparator(checkAndMutate.getValue());
}
TimeRange timeRange = checkAndMutate.getTimeRange();
Mutation mutation = null;
RowMutations rowMutations = null;
if (checkAndMutate.getAction() instanceof Mutation) {
mutation = (Mutation) checkAndMutate.getAction();
} else {
rowMutations = (RowMutations) checkAndMutate.getAction();
}
if (mutation != null) {
checkMutationType(mutation);
checkRow(mutation, row);
@ -4341,32 +4404,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkRow(row, "doCheckAndRowMutate");
RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
try {
if (mutation != null && this.getCoprocessorHost() != null) {
// Call coprocessor.
Boolean processed = null;
if (mutation instanceof Put) {
if (filter != null) {
processed = this.getCoprocessorHost()
.preCheckAndPutAfterRowLock(row, filter, (Put) mutation);
} else {
processed = this.getCoprocessorHost()
.preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator,
(Put) mutation);
}
} else if (mutation instanceof Delete) {
if (filter != null) {
processed = this.getCoprocessorHost()
.preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation);
} else {
processed = this.getCoprocessorHost()
.preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator,
(Delete) mutation);
}
}
if (processed != null) {
return processed;
if (this.getCoprocessorHost() != null) {
CheckAndMutateResult result =
getCoprocessorHost().preCheckAndMutateAfterRowLock(checkAndMutate);
if (result != null) {
return result;
}
}
// NOTE: We used to wait here until mvcc caught up: mvcc.await();
// Supposition is that now all changes are done under row locks, then when we go to read,
// we'll get the latest on this row.
@ -4424,10 +4469,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
mutateRow(rowMutations);
}
this.checkAndMutateChecksPassed.increment();
return true;
return new CheckAndMutateResult(true, null);
}
this.checkAndMutateChecksFailed.increment();
return false;
return new CheckAndMutateResult(false, null);
} finally {
rowLock.release();
}

View File

@ -153,6 +153,10 @@ public class MetricsRegionServer {
serverSource.updateCheckAndPut(t);
}
public void updateCheckAndMutate(long t) {
serverSource.updateCheckAndMutate(t);
}
public void updateGet(TableName tn, long t) {
if (tableMetrics != null && tn != null) {
tableMetrics.updateGet(tn, t);

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseIOException;
@ -68,6 +67,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@ -79,7 +80,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
@ -88,10 +88,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@ -624,62 +621,55 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
/**
* Mutate a list of rows atomically.
* @param cellScanner if non-null, the mutation data -- the Cell content.
*/
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
CellScanner cellScanner, Condition condition,ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
int countOfCompleteMutation = 0;
try {
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
RowMutations rm = null;
int i = 0;
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
List<Mutation> mutations = new ArrayList<>();
for (ClientProtos.Action action: actions) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
}
MutationType type = action.getMutation().getMutateType();
if (rm == null) {
rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
}
switch (type) {
case PUT:
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
++countOfCompleteMutation;
checkCellSizeLimit(region, put);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
rm.add(put);
mutations.add(put);
break;
case DELETE:
Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
++countOfCompleteMutation;
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
rm.add(del);
mutations.add(del);
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
// To unify the response format with doNonAtomicRegionMutation and read through client's
// AsyncProcess we have to add an empty result instance per operation
resultOrExceptionOrBuilder.clear();
resultOrExceptionOrBuilder.setIndex(i++);
builder.addResultOrException(
resultOrExceptionOrBuilder.build());
}
if (filter != null) {
return region.checkAndRowMutate(row, filter, timeRange, rm);
if (mutations.size() == 0) {
return new CheckAndMutateResult(true, null);
} else {
return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
CheckAndMutateResult result = null;
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
}
if (result == null) {
result = region.checkAndMutate(checkAndMutate);
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
}
}
return result;
}
} finally {
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
@ -2856,26 +2846,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
try {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier =
condition.hasQualifier() ? condition.getQualifier().toByteArray() : null;
CompareOperator op = condition.hasCompareType() ?
CompareOperator.valueOf(condition.getCompareType().name()) :
null;
ByteArrayComparable comparator = condition.hasComparator() ?
ProtobufUtil.toComparator(condition.getComparator()) : null;
Filter filter =
condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
boolean processed =
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
spaceQuotaEnforcement);
responseBuilder.setProcessed(processed);
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
cellScanner, request.getCondition(), spaceQuotaEnforcement);
responseBuilder.setProcessed(result.isSuccess());
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
for (int i = 0; i < regionAction.getActionCount(); i++) {
// To unify the response format with doNonAtomicRegionMutation and read through
// client's AsyncProcess we have to add an empty result instance per operation
resultOrExceptionOrBuilder.clear();
resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
}
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
// As it's an atomic operation with a condition, we may expect it's a global failure.
@ -2928,80 +2910,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
try {
Condition condition = regionAction.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier = condition.hasQualifier() ?
condition.getQualifier().toByteArray() : null;
CompareOperator op = condition.hasCompareType() ?
CompareOperator.valueOf(condition.getCompareType().name()) : null;
ByteArrayComparable comparator = condition.hasComparator() ?
ProtobufUtil.toComparator(condition.getComparator()) : null;
Filter filter = condition.hasFilter() ?
ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
boolean processed;
if (regionAction.hasAtomic() && regionAction.getAtomic()) {
// RowMutations
processed =
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
spaceQuotaEnforcement);
} else {
if (regionAction.getActionList().isEmpty()) {
// If the region action list is empty, do nothing.
regionActionResultBuilder.setProcessed(true);
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(result.isSuccess());
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
for (int i = 0; i < regionAction.getActionCount(); i++) {
if (i == 0 && result.getResult() != null) {
resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
.setResult(ProtobufUtil.toResult(result.getResult())).build());
continue;
}
Action action = regionAction.getAction(0);
if (action.hasGet()) {
throw new DoNotRetryIOException("CheckAndMutate doesn't support GET="
+ action.getGet());
}
MutationProto mutation = action.getMutation();
switch (mutation.getMutateType()) {
case PUT:
Put put = ProtobufUtil.toPut(mutation, cellScanner);
checkCellSizeLimit(region, put);
// Throws an exception when violated
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
quota.addMutation(put);
if (filter != null) {
processed = region.checkAndMutate(row, filter, timeRange, put);
} else {
processed = region.checkAndMutate(row, family, qualifier, op, comparator,
timeRange, put);
}
break;
case DELETE:
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
checkCellSizeLimit(region, delete);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
quota.addMutation(delete);
if (filter != null) {
processed = region.checkAndMutate(row, filter, timeRange, delete);
} else {
processed = region.checkAndMutate(row, family, qualifier, op, comparator,
timeRange, delete);
}
break;
default:
throw new DoNotRetryIOException("CheckAndMutate doesn't support "
+ mutation.getMutateType());
}
// To unify the response format with doNonAtomicRegionMutation and read through
// client's AsyncProcess we have to add an empty result instance per operation
regionActionResultBuilder.addResultOrException(
ClientProtos.ResultOrException.newBuilder().setIndex(0).build());
resultOrExceptionOrBuilder.clear();
resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
}
regionActionResultBuilder.setProcessed(processed);
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
// As it's an atomic operation with a condition, we may expect it's a global failure.
@ -3117,10 +3043,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
OperationQuota quota = null;
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
ActivePolicyEnforcement spaceQuotaEnforcement = null;
MutationType type = null;
HRegion region = null;
long before = EnvironmentEdgeManager.currentTime();
// Clear scanner so we are not holding on to reference across call.
if (controller != null) {
controller.setCellScanner(null);
@ -3129,7 +3051,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
rpcMutateRequestCount.increment();
region = getRegion(request.getRegion());
HRegion region = getRegion(request.getRegion());
rejectIfInStandByState(region);
MutateResponse.Builder builder = MutateResponse.newBuilder();
MutationProto mutation = request.getMutation();
@ -3137,136 +3059,51 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
Result r = null;
Boolean processed = null;
type = mutation.getMutateType();
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager()
.getActiveEnforcements();
switch (type) {
case APPEND:
// TODO: this doesn't actually check anything.
r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
break;
case INCREMENT:
// TODO: this doesn't actually check anything.
r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
break;
case PUT:
Put put = ProtobufUtil.toPut(mutation, cellScanner);
checkCellSizeLimit(region, put);
// Throws an exception when violated
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
quota.addMutation(put);
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier = condition.hasQualifier() ?
condition.getQualifier().toByteArray() : null;
CompareOperator op = condition.hasCompareType() ?
CompareOperator.valueOf(condition.getCompareType().name()) : null;
ByteArrayComparable comparator = condition.hasComparator() ?
ProtobufUtil.toComparator(condition.getComparator()) : null;
Filter filter = condition.hasFilter() ?
ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
if (region.getCoprocessorHost() != null) {
if (filter != null) {
processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
} else {
processed = region.getCoprocessorHost()
.preCheckAndPut(row, family, qualifier, op, comparator, put);
}
}
if (processed == null) {
boolean result;
if (filter != null) {
result = region.checkAndMutate(row, filter, timeRange, put);
} else {
result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
put);
}
if (region.getCoprocessorHost() != null) {
if (filter != null) {
result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
} else {
result = region.getCoprocessorHost()
.postCheckAndPut(row, family, qualifier, op, comparator, put, result);
}
}
processed = result;
}
} else {
region.put(put);
if (request.hasCondition()) {
CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
request.getCondition(), spaceQuotaEnforcement);
builder.setProcessed(result.isSuccess());
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, result.getResult(), null);
}
} else {
Result r = null;
Boolean processed = null;
MutationType type = mutation.getMutateType();
switch (type) {
case APPEND:
// TODO: this doesn't actually check anything.
r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
break;
case INCREMENT:
// TODO: this doesn't actually check anything.
r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
break;
case PUT:
put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
processed = Boolean.TRUE;
}
break;
case DELETE:
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
checkCellSizeLimit(region, delete);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
quota.addMutation(delete);
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier = condition.hasQualifier() ?
condition.getQualifier().toByteArray() : null;
CompareOperator op = condition.hasCompareType() ?
CompareOperator.valueOf(condition.getCompareType().name()) : null;
ByteArrayComparable comparator = condition.hasComparator() ?
ProtobufUtil.toComparator(condition.getComparator()) : null;
Filter filter = condition.hasFilter() ?
ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
if (region.getCoprocessorHost() != null) {
if (filter != null) {
processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
} else {
processed = region.getCoprocessorHost()
.preCheckAndDelete(row, family, qualifier, op, comparator, delete);
}
}
if (processed == null) {
boolean result;
if (filter != null) {
result = region.checkAndMutate(row, filter, timeRange, delete);
} else {
result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
delete);
}
if (region.getCoprocessorHost() != null) {
if (filter != null) {
result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
result);
} else {
result = region.getCoprocessorHost()
.postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
}
}
processed = result;
}
} else {
region.delete(delete);
break;
case DELETE:
delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
processed = Boolean.TRUE;
}
break;
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
if (processed != null) {
builder.setProcessed(processed.booleanValue());
}
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, r, controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, r, null);
break;
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
if (processed != null) {
builder.setProcessed(processed);
}
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, r, controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, r, null);
}
}
return builder.build();
} catch (IOException ie) {
@ -3276,32 +3113,79 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (quota != null) {
quota.close();
}
// Update metrics
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null && type != null) {
long after = EnvironmentEdgeManager.currentTime();
switch (type) {
case DELETE:
if (request.hasCondition()) {
metricsRegionServer.updateCheckAndDelete(after - before);
} else {
metricsRegionServer.updateDelete(
region == null ? null : region.getRegionInfo().getTable(), after - before);
}
break;
}
}
private void put(HRegion region, OperationQuota quota, MutationProto mutation,
CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
Put put = ProtobufUtil.toPut(mutation, cellScanner);
checkCellSizeLimit(region, put);
spaceQuota.getPolicyEnforcement(region).check(put);
quota.addMutation(put);
region.put(put);
MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before);
}
}
private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
checkCellSizeLimit(region, delete);
spaceQuota.getPolicyEnforcement(region).check(delete);
quota.addMutation(delete);
region.delete(delete);
MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before);
}
}
private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
MutationProto mutation, CellScanner cellScanner, Condition condition,
ActivePolicyEnforcement spaceQuota) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
cellScanner);
checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
quota.addMutation((Mutation) checkAndMutate.getAction());
CheckAndMutateResult result = null;
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
}
if (result == null) {
result = region.checkAndMutate(checkAndMutate);
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
}
}
MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
metricsRegionServer.updateCheckAndMutate(after - before);
MutationType type = mutation.getMutateType();
switch (type) {
case PUT:
if (request.hasCondition()) {
metricsRegionServer.updateCheckAndPut(after - before);
} else {
metricsRegionServer.updatePut(
region == null ? null : region.getRegionInfo().getTable(),after - before);
}
metricsRegionServer.updateCheckAndPut(after - before);
break;
case DELETE:
metricsRegionServer.updateCheckAndDelete(after - before);
break;
default:
break;
}
}
}
return result;
}
// This is used to keep compatible with the old client implementation. Consider remove it if we

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@ -311,7 +313,11 @@ public interface Region extends ConfigurationObserver {
* @param comparator the expected value
* @param mutation data to put if check succeeds
* @return true if mutation was applied, false otherwise
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #checkAndMutate(CheckAndMutate)} instead.
*/
@Deprecated
default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
ByteArrayComparable comparator, Mutation mutation) throws IOException {
return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation);
@ -330,7 +336,11 @@ public interface Region extends ConfigurationObserver {
* @param mutation data to put if check succeeds
* @param timeRange time range to check
* @return true if mutation was applied, false otherwise
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #checkAndMutate(CheckAndMutate)} instead.
*/
@Deprecated
boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
@ -341,7 +351,11 @@ public interface Region extends ConfigurationObserver {
* @param filter the filter
* @param mutation data to put if check succeeds
* @return true if mutation was applied, false otherwise
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #checkAndMutate(CheckAndMutate)} instead.
*/
@Deprecated
default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation)
throws IOException {
return checkAndMutate(row, filter, TimeRange.allTime(), mutation);
@ -355,7 +369,11 @@ public interface Region extends ConfigurationObserver {
* @param mutation data to put if check succeeds
* @param timeRange time range to check
* @return true if mutation was applied, false otherwise
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #checkAndMutate(CheckAndMutate)} instead.
*/
@Deprecated
boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation)
throws IOException;
@ -371,7 +389,11 @@ public interface Region extends ConfigurationObserver {
* @param comparator the expected value
* @param mutations data to put if check succeeds
* @return true if mutations were applied, false otherwise
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #checkAndMutate(CheckAndMutate)} instead.
*/
@Deprecated
default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, RowMutations mutations) throws IOException {
return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(),
@ -391,7 +413,11 @@ public interface Region extends ConfigurationObserver {
* @param mutations data to put if check succeeds
* @param timeRange time range to check
* @return true if mutations were applied, false otherwise
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #checkAndMutate(CheckAndMutate)} instead.
*/
@Deprecated
boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations)
throws IOException;
@ -404,7 +430,11 @@ public interface Region extends ConfigurationObserver {
* @param filter the filter
* @param mutations data to put if check succeeds
* @return true if mutations were applied, false otherwise
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #checkAndMutate(CheckAndMutate)} instead.
*/
@Deprecated
default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations)
throws IOException {
return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations);
@ -419,10 +449,24 @@ public interface Region extends ConfigurationObserver {
* @param mutations data to put if check succeeds
* @param timeRange time range to check
* @return true if mutations were applied, false otherwise
*
* @deprecated since 3.0.0 and will be removed in 4.0.0. Use
* {@link #checkAndMutate(CheckAndMutate)} instead.
*/
@Deprecated
boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange,
RowMutations mutations) throws IOException;
/**
* Atomically checks if a row matches the conditions and if it does, it performs the actions.
* Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
* time.
* @param checkAndMutate the CheckAndMutate object
* @return true if mutations were applied, false otherwise
* @throws IOException if an error occurred in this method
*/
CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException;
/**
* Deletes the specified cells/row.
* @param delete

View File

@ -31,13 +31,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RawCellBuilder;
import org.apache.hadoop.hbase.RawCellBuilderFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@ -62,8 +63,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -1042,322 +1041,70 @@ public class RegionCoprocessorHost
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param op the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
* @param checkAndMutate the CheckAndMutate object
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
* otherwise
* @throws IOException if an error occurred on the coprocessor
*/
public Boolean preCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Put put)
throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndPut(this, row, family, qualifier,
op, comparator, put, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
*/
public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put)
public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
regionObserverGetter, defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndPut(this, row, filter, put, getResult());
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
return observer.preCheckAndMutate(this, checkAndMutate, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param op the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
* @param checkAndMutate the CheckAndMutate object
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
* @throws IOException if an error occurred on the coprocessor
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="Null is legit")
public Boolean preCheckAndPutAfterRowLock(
final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Put put) throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier,
op, comparator, put, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="Null is legit")
public Boolean preCheckAndPutAfterRowLock(
final byte[] row, final Filter filter, final Put put) throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult());
}
});
}
/**
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param op the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
* @throws IOException e
*/
public boolean postCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Put put,
boolean result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postCheckAndPut(this, row, family, qualifier,
op, comparator, put, getResult());
}
});
}
/**
* @param row row to check
* @param filter filter
* @param put data to put if check succeeds
* @throws IOException e
*/
public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put,
boolean result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postCheckAndPut(this, row, filter, put, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
*/
public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Delete delete)
throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndDelete(this, row, family,
qualifier, op, comparator, delete, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param filter filter
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
*/
public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete)
public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
regionObserverGetter, defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndDelete(this, row, filter, delete, getResult());
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should be bypassed,
* or null otherwise
* @param checkAndMutate the CheckAndMutate object
* @param result the result returned by the checkAndMutate
* @return true or false to return to client if default processing should be bypassed, or null
* otherwise
* @throws IOException if an error occurred on the coprocessor
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="Null is legit")
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
final Delete delete) throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndDeleteAfterRowLock(this, row,
family, qualifier, op, comparator, delete, getResult());
}
});
}
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param filter filter
* @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should be bypassed,
* or null otherwise
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="Null is legit")
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter,
final Delete delete) throws IOException {
boolean bypassable = true;
boolean defaultResult = false;
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
defaultResult, bypassable) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult());
}
});
}
/**
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
* @throws IOException e
*/
public boolean postCheckAndDelete(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Delete delete,
boolean result) throws IOException {
public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
CheckAndMutateResult result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postCheckAndDelete(this, row, family,
qualifier, op, comparator, delete, getResult());
}
});
}
/**
* @param row row to check
* @param filter filter
* @param delete delete to commit if check succeeds
* @throws IOException e
*/
public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete,
boolean result) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
regionObserverGetter, result) {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postCheckAndDelete(this, row, filter, delete, getResult());
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
return observer.postCheckAndMutate(this, checkAndMutate, getResult());
}
});
}

View File

@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@ -113,6 +115,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndMutate = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndMutateAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndMutate = new AtomicInteger(0);
final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0);
@ -583,6 +588,28 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return true;
}
@Override
public CheckAndMutateResult preCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
ctPreCheckAndMutate.incrementAndGet();
return RegionObserver.super.preCheckAndMutate(c, checkAndMutate, result);
}
@Override
public CheckAndMutateResult preCheckAndMutateAfterRowLock(
ObserverContext<RegionCoprocessorEnvironment> c, CheckAndMutate checkAndMutate,
CheckAndMutateResult result) throws IOException {
ctPreCheckAndMutateAfterRowLock.incrementAndGet();
return RegionObserver.super.preCheckAndMutateAfterRowLock(c, checkAndMutate, result);
}
@Override
public CheckAndMutateResult postCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
ctPostCheckAndMutate.incrementAndGet();
return RegionObserver.super.postCheckAndMutate(c, checkAndMutate, result);
}
@Override
public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
Append append) throws IOException {
@ -826,6 +853,18 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPostCheckAndDeleteWithFilter.get();
}
public int getPreCheckAndMutate() {
return ctPreCheckAndMutate.get();
}
public int getPreCheckAndMutateAfterRowLock() {
return ctPreCheckAndMutateAfterRowLock.get();
}
public int getPostCheckAndMutate() {
return ctPostCheckAndMutate.get();
}
public boolean hadPreIncrement() {
return ctPreIncrement.get() > 0;
}

View File

@ -43,11 +43,13 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -265,15 +267,17 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
"getPostCheckAndPutWithFilter" },
tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
"getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
"getPostCheckAndPutWithFilter" },
tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
"getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
table.checkAndMutate(Bytes.toBytes(0),
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
@ -281,8 +285,9 @@ public class TestRegionObserverInterface {
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
"getPostCheckAndPutWithFilter" },
tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
"getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
} finally {
util.deleteTable(tableName);
}
@ -302,16 +307,18 @@ public class TestRegionObserverInterface {
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
"getPostCheckAndDeleteWithFilter" },
tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
"getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
"getPostCheckAndDeleteWithFilter" },
tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
"getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
table.checkAndMutate(Bytes.toBytes(0),
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
@ -320,8 +327,50 @@ public class TestRegionObserverInterface {
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
"getPostCheckAndDeleteWithFilter" },
tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
"getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
} finally {
util.deleteTable(tableName);
table.close();
}
}
@Test
public void testCheckAndMutateWithRowMutationsHooks() throws Exception {
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." +
name.getMethodName());
Table table = util.createTable(tableName, new byte[][] { A, B, C });
try {
byte[] row = Bytes.toBytes(0);
Put p = new Put(row).addColumn(A, A, A);
table.put(p);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 0, 0, 0 });
table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(A, A, A)
.build(new RowMutations(row)
.add((Mutation) new Put(row).addColumn(B, B, B))
.add((Mutation) new Delete(row))));
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 1, 1, 1 });
Object[] result = new Object[2];
table.batch(Arrays.asList(p, CheckAndMutate.newBuilder(row)
.ifEquals(A, A, A)
.build(new RowMutations(row)
.add((Mutation) new Put(row).addColumn(B, B, B))
.add((Mutation) new Delete(row)))), result);
verifyMethodResult(
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
tableName, new Integer[] { 2, 2, 2 });
} finally {
util.deleteTable(tableName);
table.close();

View File

@ -97,6 +97,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
@ -1770,6 +1772,7 @@ public class TestHRegion {
// checkAndMutate tests
// ////////////////////////////////////////////////////////////////////////////
@Test
@Deprecated
public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@ -1839,6 +1842,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndMutate_WithWrongValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@ -1888,6 +1892,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndMutate_WithCorrectValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@ -1936,6 +1941,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@ -2025,6 +2031,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndPut_ThatPutWasWritten() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@ -2066,6 +2073,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndPut_wrongRowInPut() throws IOException {
this.region = initHRegion(tableName, method, CONF, COLUMNS);
Put put = new Put(row2);
@ -2080,6 +2088,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
@ -2153,6 +2162,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndMutate_WithFilters() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
@ -2227,6 +2237,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
@ -2275,6 +2286,7 @@ public class TestHRegion {
}
@Test
@Deprecated
public void testCheckAndMutate_wrongMutationType() throws Throwable {
// Setting up region
this.region = initHRegion(tableName, method, CONF, fam1);
@ -2284,7 +2296,7 @@ public class TestHRegion {
new Increment(row).addColumn(fam1, qual1, 1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("Action must be Put or Delete", e.getMessage());
assertEquals("Unsupported mutate type: INCREMENT", e.getMessage());
}
try {
@ -2293,11 +2305,12 @@ public class TestHRegion {
new Increment(row).addColumn(fam1, qual1, 1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("Action must be Put or Delete", e.getMessage());
assertEquals("Unsupported mutate type: INCREMENT", e.getMessage());
}
}
@Test
@Deprecated
public void testCheckAndMutate_wrongRow() throws Throwable {
final byte[] wrongRow = Bytes.toBytes("wrongRow");
@ -2309,7 +2322,8 @@ public class TestHRegion {
new Put(wrongRow).addColumn(fam1, qual1, value1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("Action's getRow must match", e.getMessage());
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ "match the original one <rowA>", e.getMessage());
}
try {
@ -2318,7 +2332,8 @@ public class TestHRegion {
new Put(wrongRow).addColumn(fam1, qual1, value1));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("Action's getRow must match", e.getMessage());
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ "match the original one <rowA>", e.getMessage());
}
try {
@ -2330,7 +2345,8 @@ public class TestHRegion {
.add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("Action's getRow must match", e.getMessage());
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ "match the original one <rowA>", e.getMessage());
}
try {
@ -2342,10 +2358,494 @@ public class TestHRegion {
.add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
fail("should throw DoNotRetryIOException");
} catch (DoNotRetryIOException e) {
assertEquals("Action's getRow must match", e.getMessage());
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+ "match the original one <rowA>", e.getMessage());
}
}
@Test
public void testCheckAndMutateWithEmptyRowValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
byte[] qf1 = Bytes.toBytes("qualifier");
byte[] emptyVal = new byte[] {};
byte[] val1 = Bytes.toBytes("value1");
byte[] val2 = Bytes.toBytes("value2");
// Setting up region
this.region = initHRegion(tableName, method, CONF, fam1);
// Putting empty data in key
Put put = new Put(row1);
put.addColumn(fam1, qf1, emptyVal);
// checkAndPut with empty value
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
assertTrue(res.isSuccess());
// Putting data in key
put = new Put(row1);
put.addColumn(fam1, qf1, val1);
// checkAndPut with correct value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
assertTrue(res.isSuccess());
// not empty anymore
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
assertFalse(res.isSuccess());
Delete delete = new Delete(row1);
delete.addColumn(fam1, qf1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
assertFalse(res.isSuccess());
put = new Put(row1);
put.addColumn(fam1, qf1, val2);
// checkAndPut with correct value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
assertTrue(res.isSuccess());
// checkAndDelete with correct value
delete = new Delete(row1);
delete.addColumn(fam1, qf1);
delete.addColumn(fam1, qf1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
assertTrue(res.isSuccess());
delete = new Delete(row1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
assertTrue(res.isSuccess());
// checkAndPut looking for a null value
put = new Put(row1);
put.addColumn(fam1, qf1, val1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1)
.build(put));
assertTrue(res.isSuccess());
}
@Test
public void testCheckAndMutateWithWrongValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
byte[] qf1 = Bytes.toBytes("qualifier");
byte[] val1 = Bytes.toBytes("value1");
byte[] val2 = Bytes.toBytes("value2");
BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
// Setting up region
this.region = initHRegion(tableName, method, CONF, fam1);
// Putting data in key
Put put = new Put(row1);
put.addColumn(fam1, qf1, val1);
region.put(put);
// checkAndPut with wrong value
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
assertFalse(res.isSuccess());
// checkAndDelete with wrong value
Delete delete = new Delete(row1);
delete.addFamily(fam1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
assertFalse(res.isSuccess());
// Putting data in key
put = new Put(row1);
put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
region.put(put);
// checkAndPut with wrong value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(put));
assertFalse(res.isSuccess());
// checkAndDelete with wrong value
delete = new Delete(row1);
delete.addFamily(fam1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(delete));
assertFalse(res.isSuccess());
}
@Test
public void testCheckAndMutateWithCorrectValue() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
byte[] qf1 = Bytes.toBytes("qualifier");
byte[] val1 = Bytes.toBytes("value1");
BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
// Setting up region
this.region = initHRegion(tableName, method, CONF, fam1);
// Putting data in key
long now = System.currentTimeMillis();
Put put = new Put(row1);
put.addColumn(fam1, qf1, now, val1);
region.put(put);
// checkAndPut with correct value
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
assertTrue("First", res.isSuccess());
// checkAndDelete with correct value
Delete delete = new Delete(row1, now + 1);
delete.addColumn(fam1, qf1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
assertTrue("Delete", res.isSuccess());
// Putting data in key
put = new Put(row1);
put.addColumn(fam1, qf1, now + 2, Bytes.toBytes(bd1));
region.put(put);
// checkAndPut with correct value
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(put));
assertTrue("Second put", res.isSuccess());
// checkAndDelete with correct value
delete = new Delete(row1, now + 3);
delete.addColumn(fam1, qf1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(delete));
assertTrue("Second delete", res.isSuccess());
}
@Test
public void testCheckAndMutateWithNonEqualCompareOp() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
byte[] qf1 = Bytes.toBytes("qualifier");
byte[] val1 = Bytes.toBytes("value1");
byte[] val2 = Bytes.toBytes("value2");
byte[] val3 = Bytes.toBytes("value3");
byte[] val4 = Bytes.toBytes("value4");
// Setting up region
this.region = initHRegion(tableName, method, CONF, fam1);
// Putting val3 in key
Put put = new Put(row1);
put.addColumn(fam1, qf1, val3);
region.put(put);
// Test CompareOp.LESS: original = val3, compare with val3, fail
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put));
assertFalse(res.isSuccess());
// Test CompareOp.LESS: original = val3, compare with val4, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put));
assertFalse(res.isSuccess());
// Test CompareOp.LESS: original = val3, compare with val2,
// succeed (now value = val2)
put = new Put(row1);
put.addColumn(fam1, qf1, val2);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put));
assertTrue(res.isSuccess());
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put));
assertFalse(res.isSuccess());
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
// succeed (value still = val2)
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put));
assertTrue(res.isSuccess());
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
// succeed (now value = val3)
put = new Put(row1);
put.addColumn(fam1, qf1, val3);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put));
assertTrue(res.isSuccess());
// Test CompareOp.GREATER: original = val3, compare with val3, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put));
assertFalse(res.isSuccess());
// Test CompareOp.GREATER: original = val3, compare with val2, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put));
assertFalse(res.isSuccess());
// Test CompareOp.GREATER: original = val3, compare with val4,
// succeed (now value = val2)
put = new Put(row1);
put.addColumn(fam1, qf1, val2);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put));
assertTrue(res.isSuccess());
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val1).build(put));
assertFalse(res.isSuccess());
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
// succeed (value still = val2)
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val2).build(put));
assertTrue(res.isSuccess());
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val3).build(put));
assertTrue(res.isSuccess());
}
@Test
public void testCheckAndPutThatPutWasWritten() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
byte[] fam2 = Bytes.toBytes("fam2");
byte[] qf1 = Bytes.toBytes("qualifier");
byte[] val1 = Bytes.toBytes("value1");
byte[] val2 = Bytes.toBytes("value2");
byte[][] families = { fam1, fam2 };
// Setting up region
this.region = initHRegion(tableName, method, CONF, families);
// Putting data in the key to check
Put put = new Put(row1);
put.addColumn(fam1, qf1, val1);
region.put(put);
// Creating put to add
long ts = System.currentTimeMillis();
KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
put = new Put(row1);
put.add(kv);
// checkAndPut with wrong value
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
assertTrue(res.isSuccess());
Get get = new Get(row1);
get.addColumn(fam2, qf1);
Cell[] actual = region.get(get).rawCells();
Cell[] expected = { kv };
assertEquals(expected.length, actual.length);
for (int i = 0; i < actual.length; i++) {
assertEquals(expected[i], actual[i]);
}
}
@Test
public void testCheckAndDeleteThatDeleteWasWritten() throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] fam1 = Bytes.toBytes("fam1");
byte[] fam2 = Bytes.toBytes("fam2");
byte[] qf1 = Bytes.toBytes("qualifier1");
byte[] qf2 = Bytes.toBytes("qualifier2");
byte[] qf3 = Bytes.toBytes("qualifier3");
byte[] val1 = Bytes.toBytes("value1");
byte[] val2 = Bytes.toBytes("value2");
byte[] val3 = Bytes.toBytes("value3");
byte[] emptyVal = new byte[] {};
byte[][] families = { fam1, fam2 };
// Setting up region
this.region = initHRegion(tableName, method, CONF, families);
// Put content
Put put = new Put(row1);
put.addColumn(fam1, qf1, val1);
region.put(put);
Threads.sleep(2);
put = new Put(row1);
put.addColumn(fam1, qf1, val2);
put.addColumn(fam2, qf1, val3);
put.addColumn(fam2, qf2, val2);
put.addColumn(fam2, qf3, val1);
put.addColumn(fam1, qf3, val1);
region.put(put);
LOG.info("get={}", region.get(new Get(row1).addColumn(fam1, qf1)).toString());
// Multi-column delete
Delete delete = new Delete(row1);
delete.addColumn(fam1, qf1);
delete.addColumn(fam2, qf1);
delete.addColumn(fam1, qf3);
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
assertTrue(res.isSuccess());
Get get = new Get(row1);
get.addColumn(fam1, qf1);
get.addColumn(fam1, qf3);
get.addColumn(fam2, qf2);
Result r = region.get(get);
assertEquals(2, r.size());
assertArrayEquals(val1, r.getValue(fam1, qf1));
assertArrayEquals(val2, r.getValue(fam2, qf2));
// Family delete
delete = new Delete(row1);
delete.addFamily(fam2);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam2, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
assertTrue(res.isSuccess());
get = new Get(row1);
r = region.get(get);
assertEquals(1, r.size());
assertArrayEquals(val1, r.getValue(fam1, qf1));
// Row delete
delete = new Delete(row1);
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
assertTrue(res.isSuccess());
get = new Get(row1);
r = region.get(get);
assertEquals(0, r.size());
}
@Test
public void testCheckAndMutateWithFilters() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
// Setting up region
this.region = initHRegion(tableName, method, CONF, FAMILY);
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
region.put(put);
// Put with success
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
assertTrue(res.isSuccess());
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
assertFalse(res.isSuccess());
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
// Delete with success
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))));
assertTrue(res.isSuccess());
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
// Mutate with success
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
assertTrue(res.isSuccess());
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
}
@Test
public void testCheckAndMutateWithFiltersAndTimeRange() throws Throwable {
final byte[] FAMILY = Bytes.toBytes("fam");
// Setting up region
this.region = initHRegion(tableName, method, CONF, FAMILY);
// Put with specifying the timestamp
region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
// Put with success
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 101))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
assertTrue(res.isSuccess());
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 100))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
assertFalse(res.isSuccess());
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
// Mutate with success
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 101))
.build(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
assertTrue(res.isSuccess());
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
}
// ////////////////////////////////////////////////////////////////////////////
// Delete tests
// ////////////////////////////////////////////////////////////////////////////

View File

@ -153,6 +153,7 @@ public class TestMetricsRegionServer {
rsm.updateDelete(null, 17);
rsm.updateCheckAndDelete(17);
rsm.updateCheckAndPut(17);
rsm.updateCheckAndMutate(17);
}
HELPER.assertCounter("appendNumOps", 24, serverSource);
@ -164,7 +165,7 @@ public class TestMetricsRegionServer {
HELPER.assertCounter("deleteNumOps", 17, serverSource);
HELPER.assertCounter("checkAndDeleteNumOps", 17, serverSource);
HELPER.assertCounter("checkAndPutNumOps", 17, serverSource);
HELPER.assertCounter("checkAndMutateNumOps", 17, serverSource);
HELPER.assertCounter("slowAppendCount", 12, serverSource);
HELPER.assertCounter("slowDeleteCount", 13, serverSource);