HBASE-24680 Refactor the checkAndMutate code on the server side (#2184)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
ebf493f075
commit
22bf9a38c9
|
@ -214,7 +214,7 @@ public final class CheckAndMutate extends Mutation {
|
|||
this.op = op;
|
||||
this.value = value;
|
||||
this.filter = null;
|
||||
this.timeRange = timeRange;
|
||||
this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
|
@ -225,7 +225,7 @@ public final class CheckAndMutate extends Mutation {
|
|||
this.op = null;
|
||||
this.value = null;
|
||||
this.filter = filter;
|
||||
this.timeRange = timeRange;
|
||||
this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
|
@ -264,6 +264,13 @@ public final class CheckAndMutate extends Mutation {
|
|||
return filter;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether this has a filter or not
|
||||
*/
|
||||
public boolean hasFilter() {
|
||||
return filter != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the time range to check
|
||||
*/
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.Cell.Type;
|
|||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ExtendedCellBuilder;
|
||||
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
||||
|
@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.ClientUtil;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
|
@ -84,6 +86,7 @@ import org.apache.hadoop.hbase.client.RegionLoadStats;
|
|||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.RegionStatesCount;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.SlowLogParams;
|
||||
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
||||
|
@ -3469,4 +3472,72 @@ public final class ProtobufUtil {
|
|||
return clearSlowLogResponses.getIsCleaned();
|
||||
}
|
||||
|
||||
public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
|
||||
MutationProto mutation, CellScanner cellScanner) throws IOException {
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row);
|
||||
Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
if (filter != null) {
|
||||
builder.ifMatches(filter);
|
||||
} else {
|
||||
builder.ifMatches(condition.getFamily().toByteArray(),
|
||||
condition.getQualifier().toByteArray(),
|
||||
CompareOperator.valueOf(condition.getCompareType().name()),
|
||||
ProtobufUtil.toComparator(condition.getComparator()).getValue());
|
||||
}
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
|
||||
builder.timeRange(timeRange);
|
||||
|
||||
try {
|
||||
MutationType type = mutation.getMutateType();
|
||||
switch (type) {
|
||||
case PUT:
|
||||
return builder.build(ProtobufUtil.toPut(mutation, cellScanner));
|
||||
case DELETE:
|
||||
return builder.build(ProtobufUtil.toDelete(mutation, cellScanner));
|
||||
default:
|
||||
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DoNotRetryIOException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
|
||||
List<Mutation> mutations) throws IOException {
|
||||
assert mutations.size() > 0;
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row);
|
||||
Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
if (filter != null) {
|
||||
builder.ifMatches(filter);
|
||||
} else {
|
||||
builder.ifMatches(condition.getFamily().toByteArray(),
|
||||
condition.getQualifier().toByteArray(),
|
||||
CompareOperator.valueOf(condition.getCompareType().name()),
|
||||
ProtobufUtil.toComparator(condition.getComparator()).getValue());
|
||||
}
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
|
||||
builder.timeRange(timeRange);
|
||||
|
||||
try {
|
||||
if (mutations.size() == 1) {
|
||||
Mutation m = mutations.get(0);
|
||||
if (m instanceof Put) {
|
||||
return builder.build((Put) m);
|
||||
} else if (m instanceof Delete) {
|
||||
return builder.build((Delete) m);
|
||||
} else {
|
||||
throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0)
|
||||
.getClass().getSimpleName().toUpperCase());
|
||||
}
|
||||
} else {
|
||||
return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations));
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DoNotRetryIOException(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,6 +86,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
|
|||
*/
|
||||
void updateCheckAndPut(long t);
|
||||
|
||||
/**
|
||||
* Update checkAndMutate histogram
|
||||
* @param t time it took
|
||||
*/
|
||||
void updateCheckAndMutate(long t);
|
||||
|
||||
/**
|
||||
* Update the Get time histogram .
|
||||
*
|
||||
|
@ -393,6 +399,7 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
|
|||
String DELETE_KEY = "delete";
|
||||
String CHECK_AND_DELETE_KEY = "checkAndDelete";
|
||||
String CHECK_AND_PUT_KEY = "checkAndPut";
|
||||
String CHECK_AND_MUTATE_KEY = "checkAndMutate";
|
||||
String DELETE_BATCH_KEY = "deleteBatch";
|
||||
String GET_SIZE_KEY = "getSize";
|
||||
String GET_KEY = "get";
|
||||
|
|
|
@ -42,6 +42,7 @@ public class MetricsRegionServerSourceImpl
|
|||
private final MetricHistogram deleteBatchHisto;
|
||||
private final MetricHistogram checkAndDeleteHisto;
|
||||
private final MetricHistogram checkAndPutHisto;
|
||||
private final MetricHistogram checkAndMutateHisto;
|
||||
private final MetricHistogram getHisto;
|
||||
private final MetricHistogram incrementHisto;
|
||||
private final MetricHistogram appendHisto;
|
||||
|
@ -112,6 +113,7 @@ public class MetricsRegionServerSourceImpl
|
|||
deleteBatchHisto = getMetricsRegistry().newTimeHistogram(DELETE_BATCH_KEY);
|
||||
checkAndDeleteHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_DELETE_KEY);
|
||||
checkAndPutHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_PUT_KEY);
|
||||
checkAndMutateHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_MUTATE_KEY);
|
||||
|
||||
getHisto = getMetricsRegistry().newTimeHistogram(GET_KEY);
|
||||
slowGet = getMetricsRegistry().newCounter(SLOW_GET_KEY, SLOW_GET_DESC, 0L);
|
||||
|
@ -614,6 +616,11 @@ public class MetricsRegionServerSourceImpl
|
|||
checkAndPutHisto.add(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateCheckAndMutate(long t) {
|
||||
checkAndMutateHisto.add(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updatePutBatch(long t) {
|
||||
putBatchHisto.add(t);
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
|
@ -514,7 +517,11 @@ public interface RegionObserver {
|
|||
* @param put data to put if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
|
||||
boolean result) throws IOException {
|
||||
|
@ -535,7 +542,11 @@ public interface RegionObserver {
|
|||
* @param put data to put if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Put put, boolean result) throws IOException {
|
||||
return result;
|
||||
|
@ -562,7 +573,12 @@ public interface RegionObserver {
|
|||
* @param put data to put if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
|
||||
* instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, Put put, boolean result) throws IOException {
|
||||
|
@ -587,7 +603,12 @@ public interface RegionObserver {
|
|||
* @param put data to put if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
|
||||
* instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, Filter filter, Put put, boolean result) throws IOException {
|
||||
return result;
|
||||
|
@ -607,7 +628,11 @@ public interface RegionObserver {
|
|||
* @param put data to put if check succeeds
|
||||
* @param result from the checkAndPut
|
||||
* @return the possibly transformed return value to return to client
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
|
||||
boolean result) throws IOException {
|
||||
|
@ -625,7 +650,11 @@ public interface RegionObserver {
|
|||
* @param put data to put if check succeeds
|
||||
* @param result from the checkAndPut
|
||||
* @return the possibly transformed return value to return to client
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Put put, boolean result) throws IOException {
|
||||
return result;
|
||||
|
@ -648,7 +677,11 @@ public interface RegionObserver {
|
|||
* @param delete delete to commit if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
|
||||
Delete delete, boolean result) throws IOException {
|
||||
|
@ -669,7 +702,11 @@ public interface RegionObserver {
|
|||
* @param delete delete to commit if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Delete delete, boolean result) throws IOException {
|
||||
return result;
|
||||
|
@ -696,7 +733,12 @@ public interface RegionObserver {
|
|||
* @param delete delete to commit if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
|
||||
* instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
|
||||
|
@ -721,7 +763,12 @@ public interface RegionObserver {
|
|||
* @param delete delete to commit if check succeeds
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
|
||||
* instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
|
||||
return result;
|
||||
|
@ -741,7 +788,11 @@ public interface RegionObserver {
|
|||
* @param delete delete to commit if check succeeds
|
||||
* @param result from the CheckAndDelete
|
||||
* @return the possibly transformed returned value to return to client
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
|
||||
Delete delete, boolean result) throws IOException {
|
||||
|
@ -759,12 +810,150 @@ public interface RegionObserver {
|
|||
* @param delete delete to commit if check succeeds
|
||||
* @param result from the CheckAndDelete
|
||||
* @return the possibly transformed returned value to return to client
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
|
||||
Filter filter, Delete delete, boolean result) throws IOException {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndMutate
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in actions beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param checkAndMutate the CheckAndMutate object
|
||||
* @param result the default value of the result
|
||||
* @return the return value to return to client if bypassing default processing
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
default CheckAndMutateResult preCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
|
||||
if (checkAndMutate.getAction() instanceof Put) {
|
||||
boolean success;
|
||||
if (checkAndMutate.hasFilter()) {
|
||||
success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFilter(),
|
||||
(Put) checkAndMutate.getAction(), result.isSuccess());
|
||||
} else {
|
||||
success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFamily(),
|
||||
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(),
|
||||
new BinaryComparator(checkAndMutate.getValue()), (Put) checkAndMutate.getAction(),
|
||||
result.isSuccess());
|
||||
}
|
||||
return new CheckAndMutateResult(success, null);
|
||||
} else if (checkAndMutate.getAction() instanceof Delete) {
|
||||
boolean success;
|
||||
if (checkAndMutate.hasFilter()) {
|
||||
success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFilter(),
|
||||
(Delete) checkAndMutate.getAction(), result.isSuccess());
|
||||
} else {
|
||||
success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFamily(),
|
||||
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(),
|
||||
new BinaryComparator(checkAndMutate.getValue()), (Delete) checkAndMutate.getAction(),
|
||||
result.isSuccess());
|
||||
}
|
||||
return new CheckAndMutateResult(success, null);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before checkAndDelete but after acquiring rowlock.
|
||||
* <p>
|
||||
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
|
||||
* Row will be locked for longer time. Trying to acquire lock on another row, within this,
|
||||
* can lead to potential deadlock.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#bypass to skip default actions.
|
||||
* If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in actions beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param checkAndMutate the CheckAndMutate object
|
||||
* @param result the default value of the result
|
||||
* @return the value to return to client if bypassing default processing
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
default CheckAndMutateResult preCheckAndMutateAfterRowLock(
|
||||
ObserverContext<RegionCoprocessorEnvironment> c, CheckAndMutate checkAndMutate,
|
||||
CheckAndMutateResult result) throws IOException {
|
||||
if (checkAndMutate.getAction() instanceof Put) {
|
||||
boolean success;
|
||||
if (checkAndMutate.hasFilter()) {
|
||||
success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess());
|
||||
} else {
|
||||
success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
|
||||
checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
|
||||
(Put) checkAndMutate.getAction(), result.isSuccess());
|
||||
}
|
||||
return new CheckAndMutateResult(success, null);
|
||||
} else if (checkAndMutate.getAction() instanceof Delete) {
|
||||
boolean success;
|
||||
if (checkAndMutate.hasFilter()) {
|
||||
success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess());
|
||||
} else {
|
||||
success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
|
||||
checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
|
||||
(Delete) checkAndMutate.getAction(), result.isSuccess());
|
||||
}
|
||||
return new CheckAndMutateResult(success, null);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after checkAndMutate
|
||||
* <p>
|
||||
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
|
||||
* If need a Cell reference for later use, copy the cell and use that.
|
||||
* @param c the environment provided by the region server
|
||||
* @param checkAndMutate the CheckAndMutate object
|
||||
* @param result from the checkAndMutate
|
||||
* @return the possibly transformed returned value to return to client
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
default CheckAndMutateResult postCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
|
||||
if (checkAndMutate.getAction() instanceof Put) {
|
||||
boolean success;
|
||||
if (checkAndMutate.hasFilter()) {
|
||||
success = postCheckAndPut(c, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess());
|
||||
} else {
|
||||
success = postCheckAndPut(c, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
|
||||
checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
|
||||
(Put) checkAndMutate.getAction(), result.isSuccess());
|
||||
}
|
||||
return new CheckAndMutateResult(success, null);
|
||||
} else if (checkAndMutate.getAction() instanceof Delete) {
|
||||
boolean success;
|
||||
if (checkAndMutate.hasFilter()) {
|
||||
success = postCheckAndDelete(c, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess());
|
||||
} else {
|
||||
success = postCheckAndDelete(c, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
|
||||
checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
|
||||
(Delete) checkAndMutate.getAction(), result.isSuccess());
|
||||
}
|
||||
return new CheckAndMutateResult(success, null);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before Append.
|
||||
* <p>
|
||||
|
|
|
@ -103,6 +103,8 @@ import org.apache.hadoop.hbase.Tag;
|
|||
import org.apache.hadoop.hbase.TagUtil;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.CompactionState;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
|
@ -129,6 +131,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
|||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterWrapper;
|
||||
|
@ -4249,43 +4252,103 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
|
||||
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null,
|
||||
mutation);
|
||||
CheckAndMutate checkAndMutate;
|
||||
try {
|
||||
CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange);
|
||||
if (mutation instanceof Put) {
|
||||
checkAndMutate = builder.build((Put) mutation);
|
||||
} else if (mutation instanceof Delete) {
|
||||
checkAndMutate = builder.build((Delete) mutation);
|
||||
} else {
|
||||
throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass()
|
||||
.getSimpleName().toUpperCase());
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DoNotRetryIOException(e.getMessage());
|
||||
}
|
||||
return checkAndMutate(checkAndMutate).isSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation)
|
||||
throws IOException {
|
||||
return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation);
|
||||
CheckAndMutate checkAndMutate;
|
||||
try {
|
||||
CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row).ifMatches(filter)
|
||||
.timeRange(timeRange);
|
||||
if (mutation instanceof Put) {
|
||||
checkAndMutate = builder.build((Put) mutation);
|
||||
} else if (mutation instanceof Delete) {
|
||||
checkAndMutate = builder.build((Delete) mutation);
|
||||
} else {
|
||||
throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass()
|
||||
.getSimpleName().toUpperCase());
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DoNotRetryIOException(e.getMessage());
|
||||
}
|
||||
return checkAndMutate(checkAndMutate).isSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
|
||||
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null);
|
||||
CheckAndMutate checkAndMutate;
|
||||
try {
|
||||
checkAndMutate = CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange).build(rm);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DoNotRetryIOException(e.getMessage());
|
||||
}
|
||||
return checkAndMutate(checkAndMutate).isSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm)
|
||||
throws IOException {
|
||||
return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null);
|
||||
CheckAndMutate checkAndMutate;
|
||||
try {
|
||||
checkAndMutate = CheckAndMutate.newBuilder(row).ifMatches(filter).timeRange(timeRange)
|
||||
.build(rm);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new DoNotRetryIOException(e.getMessage());
|
||||
}
|
||||
return checkAndMutate(checkAndMutate).isSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
|
||||
byte[] row = checkAndMutate.getRow();
|
||||
Filter filter = null;
|
||||
byte[] family = null;
|
||||
byte[] qualifier = null;
|
||||
CompareOperator op = null;
|
||||
ByteArrayComparable comparator = null;
|
||||
if (checkAndMutate.hasFilter()) {
|
||||
filter = checkAndMutate.getFilter();
|
||||
} else {
|
||||
family = checkAndMutate.getFamily();
|
||||
qualifier = checkAndMutate.getQualifier();
|
||||
op = checkAndMutate.getCompareOp();
|
||||
comparator = new BinaryComparator(checkAndMutate.getValue());
|
||||
}
|
||||
TimeRange timeRange = checkAndMutate.getTimeRange();
|
||||
|
||||
Mutation mutation = null;
|
||||
RowMutations rowMutations = null;
|
||||
if (checkAndMutate.getAction() instanceof Mutation) {
|
||||
mutation = (Mutation) checkAndMutate.getAction();
|
||||
} else {
|
||||
rowMutations = (RowMutations) checkAndMutate.getAction();
|
||||
}
|
||||
|
||||
/**
|
||||
* checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
|
||||
* switches in the few places where there is deviation.
|
||||
*/
|
||||
private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
|
||||
RowMutations rowMutations, Mutation mutation)
|
||||
throws IOException {
|
||||
// Could do the below checks but seems wacky with two callers only. Just comment out for now.
|
||||
// One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
|
||||
// need these commented out checks.
|
||||
// if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null");
|
||||
// if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set");
|
||||
if (mutation != null) {
|
||||
checkMutationType(mutation);
|
||||
checkRow(mutation, row);
|
||||
|
@ -4312,32 +4375,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
checkRow(row, "doCheckAndRowMutate");
|
||||
RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
|
||||
try {
|
||||
if (mutation != null && this.getCoprocessorHost() != null) {
|
||||
// Call coprocessor.
|
||||
Boolean processed = null;
|
||||
if (mutation instanceof Put) {
|
||||
if (filter != null) {
|
||||
processed = this.getCoprocessorHost()
|
||||
.preCheckAndPutAfterRowLock(row, filter, (Put) mutation);
|
||||
} else {
|
||||
processed = this.getCoprocessorHost()
|
||||
.preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator,
|
||||
(Put) mutation);
|
||||
}
|
||||
} else if (mutation instanceof Delete) {
|
||||
if (filter != null) {
|
||||
processed = this.getCoprocessorHost()
|
||||
.preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation);
|
||||
} else {
|
||||
processed = this.getCoprocessorHost()
|
||||
.preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator,
|
||||
(Delete) mutation);
|
||||
}
|
||||
}
|
||||
if (processed != null) {
|
||||
return processed;
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
CheckAndMutateResult result =
|
||||
getCoprocessorHost().preCheckAndMutateAfterRowLock(checkAndMutate);
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: We used to wait here until mvcc caught up: mvcc.await();
|
||||
// Supposition is that now all changes are done under row locks, then when we go to read,
|
||||
// we'll get the latest on this row.
|
||||
|
@ -4395,10 +4440,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
mutateRow(rowMutations);
|
||||
}
|
||||
this.checkAndMutateChecksPassed.increment();
|
||||
return true;
|
||||
return new CheckAndMutateResult(true, null);
|
||||
}
|
||||
this.checkAndMutateChecksFailed.increment();
|
||||
return false;
|
||||
return new CheckAndMutateResult(false, null);
|
||||
} finally {
|
||||
rowLock.release();
|
||||
}
|
||||
|
|
|
@ -153,6 +153,10 @@ public class MetricsRegionServer {
|
|||
serverSource.updateCheckAndPut(t);
|
||||
}
|
||||
|
||||
public void updateCheckAndMutate(long t) {
|
||||
serverSource.updateCheckAndMutate(t);
|
||||
}
|
||||
|
||||
public void updateGet(TableName tn, long t) {
|
||||
if (tableMetrics != null && tn != null) {
|
||||
tableMetrics.updateGet(tn, t);
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellScannable;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
|
@ -67,6 +66,8 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
|
@ -78,7 +79,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
|||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||
|
@ -87,10 +87,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
|||
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
|
@ -620,62 +617,55 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutate a list of rows atomically.
|
||||
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
||||
*/
|
||||
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
|
||||
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
|
||||
RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
|
||||
private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
|
||||
CellScanner cellScanner, Condition condition,ActivePolicyEnforcement spaceQuotaEnforcement)
|
||||
throws IOException {
|
||||
int countOfCompleteMutation = 0;
|
||||
try {
|
||||
if (!region.getRegionInfo().isMetaRegion()) {
|
||||
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
int i = 0;
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
List<Mutation> mutations = new ArrayList<>();
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
action.getGet());
|
||||
}
|
||||
MutationType type = action.getMutation().getMutateType();
|
||||
if (rm == null) {
|
||||
rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
|
||||
}
|
||||
switch (type) {
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
|
||||
++countOfCompleteMutation;
|
||||
checkCellSizeLimit(region, put);
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
|
||||
rm.add(put);
|
||||
mutations.add(put);
|
||||
break;
|
||||
case DELETE:
|
||||
Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
|
||||
++countOfCompleteMutation;
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
|
||||
rm.add(del);
|
||||
mutations.add(del);
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through client's
|
||||
// AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i++);
|
||||
builder.addResultOrException(
|
||||
resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
|
||||
if (filter != null) {
|
||||
return region.checkAndRowMutate(row, filter, timeRange, rm);
|
||||
if (mutations.size() == 0) {
|
||||
return new CheckAndMutateResult(true, null);
|
||||
} else {
|
||||
return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
|
||||
CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
|
||||
CheckAndMutateResult result = null;
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
|
||||
}
|
||||
if (result == null) {
|
||||
result = region.checkAndMutate(checkAndMutate);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
} finally {
|
||||
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
|
||||
|
@ -2804,26 +2794,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
|
||||
try {
|
||||
Condition condition = request.getCondition();
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
|
||||
byte[] qualifier =
|
||||
condition.hasQualifier() ? condition.getQualifier().toByteArray() : null;
|
||||
CompareOperator op = condition.hasCompareType() ?
|
||||
CompareOperator.valueOf(condition.getCompareType().name()) :
|
||||
null;
|
||||
ByteArrayComparable comparator = condition.hasComparator() ?
|
||||
ProtobufUtil.toComparator(condition.getComparator()) : null;
|
||||
Filter filter =
|
||||
condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||
TimeRange.allTime();
|
||||
boolean processed =
|
||||
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
|
||||
qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
|
||||
spaceQuotaEnforcement);
|
||||
responseBuilder.setProcessed(processed);
|
||||
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
|
||||
cellScanner, request.getCondition(), spaceQuotaEnforcement);
|
||||
responseBuilder.setProcessed(result.isSuccess());
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
for (int i = 0; i < regionAction.getActionCount(); i++) {
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through
|
||||
// client's AsyncProcess we have to add an empty result instance per operation
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i);
|
||||
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
rpcServer.getMetrics().exception(e);
|
||||
// As it's an atomic operation with a condition, we may expect it's a global failure.
|
||||
|
@ -2865,80 +2847,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
try {
|
||||
if (regionAction.hasCondition()) {
|
||||
try {
|
||||
Condition condition = regionAction.getCondition();
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
|
||||
byte[] qualifier = condition.hasQualifier() ?
|
||||
condition.getQualifier().toByteArray() : null;
|
||||
CompareOperator op = condition.hasCompareType() ?
|
||||
CompareOperator.valueOf(condition.getCompareType().name()) : null;
|
||||
ByteArrayComparable comparator = condition.hasComparator() ?
|
||||
ProtobufUtil.toComparator(condition.getComparator()) : null;
|
||||
Filter filter = condition.hasFilter() ?
|
||||
ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
|
||||
|
||||
boolean processed;
|
||||
if (regionAction.hasAtomic() && regionAction.getAtomic()) {
|
||||
// RowMutations
|
||||
processed =
|
||||
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
|
||||
qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
|
||||
spaceQuotaEnforcement);
|
||||
} else {
|
||||
if (regionAction.getActionList().isEmpty()) {
|
||||
// If the region action list is empty, do nothing.
|
||||
regionActionResultBuilder.setProcessed(true);
|
||||
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
|
||||
cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
|
||||
regionActionResultBuilder.setProcessed(result.isSuccess());
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
for (int i = 0; i < regionAction.getActionCount(); i++) {
|
||||
if (i == 0 && result.getResult() != null) {
|
||||
resultOrExceptionOrBuilder.setIndex(i);
|
||||
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
|
||||
.setResult(ProtobufUtil.toResult(result.getResult())).build());
|
||||
continue;
|
||||
}
|
||||
Action action = regionAction.getAction(0);
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("CheckAndMutate doesn't support GET="
|
||||
+ action.getGet());
|
||||
}
|
||||
MutationProto mutation = action.getMutation();
|
||||
switch (mutation.getMutateType()) {
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(mutation, cellScanner);
|
||||
checkCellSizeLimit(region, put);
|
||||
// Throws an exception when violated
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
|
||||
quota.addMutation(put);
|
||||
|
||||
if (filter != null) {
|
||||
processed = region.checkAndMutate(row, filter, timeRange, put);
|
||||
} else {
|
||||
processed = region.checkAndMutate(row, family, qualifier, op, comparator,
|
||||
timeRange, put);
|
||||
}
|
||||
break;
|
||||
|
||||
case DELETE:
|
||||
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
|
||||
checkCellSizeLimit(region, delete);
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
|
||||
quota.addMutation(delete);
|
||||
|
||||
if (filter != null) {
|
||||
processed = region.checkAndMutate(row, filter, timeRange, delete);
|
||||
} else {
|
||||
processed = region.checkAndMutate(row, family, qualifier, op, comparator,
|
||||
timeRange, delete);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new DoNotRetryIOException("CheckAndMutate doesn't support "
|
||||
+ mutation.getMutateType());
|
||||
}
|
||||
|
||||
// To unify the response format with doNonAtomicRegionMutation and read through
|
||||
// client's AsyncProcess we have to add an empty result instance per operation
|
||||
regionActionResultBuilder.addResultOrException(
|
||||
ClientProtos.ResultOrException.newBuilder().setIndex(0).build());
|
||||
resultOrExceptionOrBuilder.clear();
|
||||
resultOrExceptionOrBuilder.setIndex(i);
|
||||
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
|
||||
}
|
||||
regionActionResultBuilder.setProcessed(processed);
|
||||
} catch (IOException e) {
|
||||
rpcServer.getMetrics().exception(e);
|
||||
// As it's an atomic operation with a condition, we may expect it's a global failure.
|
||||
|
@ -3039,10 +2965,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
|
||||
OperationQuota quota = null;
|
||||
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
|
||||
ActivePolicyEnforcement spaceQuotaEnforcement = null;
|
||||
MutationType type = null;
|
||||
HRegion region = null;
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
// Clear scanner so we are not holding on to reference across call.
|
||||
if (controller != null) {
|
||||
controller.setCellScanner(null);
|
||||
|
@ -3051,20 +2973,30 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
checkOpen();
|
||||
requestCount.increment();
|
||||
rpcMutateRequestCount.increment();
|
||||
region = getRegion(request.getRegion());
|
||||
HRegion region = getRegion(request.getRegion());
|
||||
MutateResponse.Builder builder = MutateResponse.newBuilder();
|
||||
MutationProto mutation = request.getMutation();
|
||||
if (!region.getRegionInfo().isMetaRegion()) {
|
||||
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
|
||||
}
|
||||
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
|
||||
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
|
||||
ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager()
|
||||
.getActiveEnforcements();
|
||||
|
||||
if (request.hasCondition()) {
|
||||
CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
|
||||
request.getCondition(), spaceQuotaEnforcement);
|
||||
builder.setProcessed(result.isSuccess());
|
||||
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
|
||||
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
|
||||
if (clientCellBlockSupported) {
|
||||
addSize(context, result.getResult(), null);
|
||||
}
|
||||
} else {
|
||||
Result r = null;
|
||||
Boolean processed = null;
|
||||
type = mutation.getMutateType();
|
||||
|
||||
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
|
||||
spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
|
||||
|
||||
MutationType type = mutation.getMutateType();
|
||||
switch (type) {
|
||||
case APPEND:
|
||||
// TODO: this doesn't actually check anything.
|
||||
|
@ -3075,120 +3007,25 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
|
||||
break;
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(mutation, cellScanner);
|
||||
checkCellSizeLimit(region, put);
|
||||
// Throws an exception when violated
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
|
||||
quota.addMutation(put);
|
||||
if (request.hasCondition()) {
|
||||
Condition condition = request.getCondition();
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
|
||||
byte[] qualifier = condition.hasQualifier() ?
|
||||
condition.getQualifier().toByteArray() : null;
|
||||
CompareOperator op = condition.hasCompareType() ?
|
||||
CompareOperator.valueOf(condition.getCompareType().name()) : null;
|
||||
ByteArrayComparable comparator = condition.hasComparator() ?
|
||||
ProtobufUtil.toComparator(condition.getComparator()) : null;
|
||||
Filter filter = condition.hasFilter() ?
|
||||
ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||
TimeRange.allTime();
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
if (filter != null) {
|
||||
processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
|
||||
} else {
|
||||
processed = region.getCoprocessorHost()
|
||||
.preCheckAndPut(row, family, qualifier, op, comparator, put);
|
||||
}
|
||||
}
|
||||
if (processed == null) {
|
||||
boolean result;
|
||||
if (filter != null) {
|
||||
result = region.checkAndMutate(row, filter, timeRange, put);
|
||||
} else {
|
||||
result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
|
||||
put);
|
||||
}
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
if (filter != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
|
||||
} else {
|
||||
result = region.getCoprocessorHost()
|
||||
.postCheckAndPut(row, family, qualifier, op, comparator, put, result);
|
||||
}
|
||||
}
|
||||
processed = result;
|
||||
}
|
||||
} else {
|
||||
region.put(put);
|
||||
put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
|
||||
processed = Boolean.TRUE;
|
||||
}
|
||||
break;
|
||||
case DELETE:
|
||||
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
|
||||
checkCellSizeLimit(region, delete);
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
|
||||
quota.addMutation(delete);
|
||||
if (request.hasCondition()) {
|
||||
Condition condition = request.getCondition();
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
|
||||
byte[] qualifier = condition.hasQualifier() ?
|
||||
condition.getQualifier().toByteArray() : null;
|
||||
CompareOperator op = condition.hasCompareType() ?
|
||||
CompareOperator.valueOf(condition.getCompareType().name()) : null;
|
||||
ByteArrayComparable comparator = condition.hasComparator() ?
|
||||
ProtobufUtil.toComparator(condition.getComparator()) : null;
|
||||
Filter filter = condition.hasFilter() ?
|
||||
ProtobufUtil.toFilter(condition.getFilter()) : null;
|
||||
TimeRange timeRange = condition.hasTimeRange() ?
|
||||
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||
TimeRange.allTime();
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
if (filter != null) {
|
||||
processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
|
||||
} else {
|
||||
processed = region.getCoprocessorHost()
|
||||
.preCheckAndDelete(row, family, qualifier, op, comparator, delete);
|
||||
}
|
||||
}
|
||||
if (processed == null) {
|
||||
boolean result;
|
||||
if (filter != null) {
|
||||
result = region.checkAndMutate(row, filter, timeRange, delete);
|
||||
} else {
|
||||
result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
|
||||
delete);
|
||||
}
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
if (filter != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
|
||||
result);
|
||||
} else {
|
||||
result = region.getCoprocessorHost()
|
||||
.postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
|
||||
}
|
||||
}
|
||||
processed = result;
|
||||
}
|
||||
} else {
|
||||
region.delete(delete);
|
||||
delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
|
||||
processed = Boolean.TRUE;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
|
||||
}
|
||||
if (processed != null) {
|
||||
builder.setProcessed(processed.booleanValue());
|
||||
builder.setProcessed(processed);
|
||||
}
|
||||
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
|
||||
addResult(builder, r, controller, clientCellBlockSupported);
|
||||
if (clientCellBlockSupported) {
|
||||
addSize(context, r, null);
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException ie) {
|
||||
regionServer.checkFileSystem();
|
||||
|
@ -3197,32 +3034,79 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (quota != null) {
|
||||
quota.close();
|
||||
}
|
||||
// Update metrics
|
||||
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
|
||||
if (metricsRegionServer != null && type != null) {
|
||||
}
|
||||
}
|
||||
|
||||
private void put(HRegion region, OperationQuota quota, MutationProto mutation,
|
||||
CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
Put put = ProtobufUtil.toPut(mutation, cellScanner);
|
||||
checkCellSizeLimit(region, put);
|
||||
spaceQuota.getPolicyEnforcement(region).check(put);
|
||||
quota.addMutation(put);
|
||||
region.put(put);
|
||||
|
||||
MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
|
||||
if (metricsRegionServer != null) {
|
||||
long after = EnvironmentEdgeManager.currentTime();
|
||||
metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before);
|
||||
}
|
||||
}
|
||||
|
||||
private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
|
||||
CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
|
||||
checkCellSizeLimit(region, delete);
|
||||
spaceQuota.getPolicyEnforcement(region).check(delete);
|
||||
quota.addMutation(delete);
|
||||
region.delete(delete);
|
||||
|
||||
MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
|
||||
if (metricsRegionServer != null) {
|
||||
long after = EnvironmentEdgeManager.currentTime();
|
||||
metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before);
|
||||
}
|
||||
}
|
||||
|
||||
private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
|
||||
MutationProto mutation, CellScanner cellScanner, Condition condition,
|
||||
ActivePolicyEnforcement spaceQuota) throws IOException {
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
|
||||
cellScanner);
|
||||
checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
|
||||
spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
|
||||
quota.addMutation((Mutation) checkAndMutate.getAction());
|
||||
|
||||
CheckAndMutateResult result = null;
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
|
||||
}
|
||||
if (result == null) {
|
||||
result = region.checkAndMutate(checkAndMutate);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
|
||||
}
|
||||
}
|
||||
MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
|
||||
if (metricsRegionServer != null) {
|
||||
long after = EnvironmentEdgeManager.currentTime();
|
||||
metricsRegionServer.updateCheckAndMutate(after - before);
|
||||
|
||||
MutationType type = mutation.getMutateType();
|
||||
switch (type) {
|
||||
case DELETE:
|
||||
if (request.hasCondition()) {
|
||||
metricsRegionServer.updateCheckAndDelete(after - before);
|
||||
} else {
|
||||
metricsRegionServer.updateDelete(
|
||||
region == null ? null : region.getRegionInfo().getTable(), after - before);
|
||||
}
|
||||
break;
|
||||
case PUT:
|
||||
if (request.hasCondition()) {
|
||||
metricsRegionServer.updateCheckAndPut(after - before);
|
||||
} else {
|
||||
metricsRegionServer.updatePut(
|
||||
region == null ? null : region.getRegionInfo().getTable(),after - before);
|
||||
}
|
||||
break;
|
||||
case DELETE:
|
||||
metricsRegionServer.updateCheckAndDelete(after - before);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// This is used to keep compatible with the old client implementation. Consider remove it if we
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.CellComparator;
|
|||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
|
||||
import org.apache.hadoop.hbase.client.CompactionState;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -308,7 +310,11 @@ public interface Region extends ConfigurationObserver {
|
|||
* @param comparator the expected value
|
||||
* @param mutation data to put if check succeeds
|
||||
* @return true if mutation was applied, false otherwise
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #checkAndMutate(CheckAndMutate)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, Mutation mutation) throws IOException {
|
||||
return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation);
|
||||
|
@ -327,7 +333,11 @@ public interface Region extends ConfigurationObserver {
|
|||
* @param mutation data to put if check succeeds
|
||||
* @param timeRange time range to check
|
||||
* @return true if mutation was applied, false otherwise
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #checkAndMutate(CheckAndMutate)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
|
||||
|
||||
|
@ -338,7 +348,11 @@ public interface Region extends ConfigurationObserver {
|
|||
* @param filter the filter
|
||||
* @param mutation data to put if check succeeds
|
||||
* @return true if mutation was applied, false otherwise
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #checkAndMutate(CheckAndMutate)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation)
|
||||
throws IOException {
|
||||
return checkAndMutate(row, filter, TimeRange.allTime(), mutation);
|
||||
|
@ -352,7 +366,11 @@ public interface Region extends ConfigurationObserver {
|
|||
* @param mutation data to put if check succeeds
|
||||
* @param timeRange time range to check
|
||||
* @return true if mutation was applied, false otherwise
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #checkAndMutate(CheckAndMutate)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation)
|
||||
throws IOException;
|
||||
|
||||
|
@ -368,7 +386,11 @@ public interface Region extends ConfigurationObserver {
|
|||
* @param comparator the expected value
|
||||
* @param mutations data to put if check succeeds
|
||||
* @return true if mutations were applied, false otherwise
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #checkAndMutate(CheckAndMutate)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, RowMutations mutations) throws IOException {
|
||||
return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(),
|
||||
|
@ -388,7 +410,11 @@ public interface Region extends ConfigurationObserver {
|
|||
* @param mutations data to put if check succeeds
|
||||
* @param timeRange time range to check
|
||||
* @return true if mutations were applied, false otherwise
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #checkAndMutate(CheckAndMutate)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
||||
ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations)
|
||||
throws IOException;
|
||||
|
@ -401,7 +427,11 @@ public interface Region extends ConfigurationObserver {
|
|||
* @param filter the filter
|
||||
* @param mutations data to put if check succeeds
|
||||
* @return true if mutations were applied, false otherwise
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #checkAndMutate(CheckAndMutate)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations)
|
||||
throws IOException {
|
||||
return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations);
|
||||
|
@ -416,10 +446,24 @@ public interface Region extends ConfigurationObserver {
|
|||
* @param mutations data to put if check succeeds
|
||||
* @param timeRange time range to check
|
||||
* @return true if mutations were applied, false otherwise
|
||||
*
|
||||
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
|
||||
* {@link #checkAndMutate(CheckAndMutate)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange,
|
||||
RowMutations mutations) throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically checks if a row matches the conditions and if it does, it performs the actions.
|
||||
* Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
|
||||
* time.
|
||||
* @param checkAndMutate the CheckAndMutate object
|
||||
* @return true if mutations were applied, false otherwise
|
||||
* @throws IOException if an error occurred in this method
|
||||
*/
|
||||
CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes the specified cells/row.
|
||||
* @param delete
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RawCellBuilder;
|
||||
|
@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.RawCellBuilderFactory;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.SharedConnection;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
|
@ -67,8 +68,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -1051,322 +1050,70 @@ public class RegionCoprocessorHost
|
|||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
* @param qualifier column qualifier
|
||||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param put data to put if check succeeds
|
||||
* @param checkAndMutate the CheckAndMutate object
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public Boolean preCheckAndPut(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final CompareOperator op,
|
||||
final ByteArrayComparable comparator, final Put put)
|
||||
public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
|
||||
throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
|
||||
regionObserverGetter, defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndPut(this, row, family, qualifier,
|
||||
op, comparator, put, getResult());
|
||||
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndMutate(this, checkAndMutate, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @param checkAndMutate the CheckAndMutate object
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put)
|
||||
public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
|
||||
throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
|
||||
regionObserverGetter, defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndPut(this, row, filter, put, getResult());
|
||||
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
* @param qualifier column qualifier
|
||||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param put data to put if check succeeds
|
||||
* @param checkAndMutate the CheckAndMutate object
|
||||
* @param result the result returned by the checkAndMutate
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
* @throws IOException if an error occurred on the coprocessor
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
|
||||
justification="Null is legit")
|
||||
public Boolean preCheckAndPutAfterRowLock(
|
||||
final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
|
||||
final ByteArrayComparable comparator, final Put put) throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier,
|
||||
op, comparator, put, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
|
||||
justification="Null is legit")
|
||||
public Boolean preCheckAndPutAfterRowLock(
|
||||
final byte[] row, final Filter filter, final Put put) throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
* @param qualifier column qualifier
|
||||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param put data to put if check succeeds
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean postCheckAndPut(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final CompareOperator op,
|
||||
final ByteArrayComparable comparator, final Put put,
|
||||
boolean result) throws IOException {
|
||||
public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
|
||||
CheckAndMutateResult result) throws IOException {
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return result;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
|
||||
new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
|
||||
regionObserverGetter, result) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.postCheckAndPut(this, row, family, qualifier,
|
||||
op, comparator, put, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param put data to put if check succeeds
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put,
|
||||
boolean result) throws IOException {
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return result;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.postCheckAndPut(this, row, filter, put, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
* @param qualifier column qualifier
|
||||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
*/
|
||||
public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final CompareOperator op,
|
||||
final ByteArrayComparable comparator, final Delete delete)
|
||||
throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndDelete(this, row, family,
|
||||
qualifier, op, comparator, delete, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed, or null
|
||||
* otherwise
|
||||
*/
|
||||
public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete)
|
||||
throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndDelete(this, row, filter, delete, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
* @param qualifier column qualifier
|
||||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed,
|
||||
* or null otherwise
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
|
||||
justification="Null is legit")
|
||||
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
|
||||
final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
|
||||
final Delete delete) throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndDeleteAfterRowLock(this, row,
|
||||
family, qualifier, op, comparator, delete, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports Coprocessor 'bypass'.
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @return true or false to return to client if default processing should be bypassed,
|
||||
* or null otherwise
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
|
||||
justification="Null is legit")
|
||||
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter,
|
||||
final Delete delete) throws IOException {
|
||||
boolean bypassable = true;
|
||||
boolean defaultResult = false;
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
|
||||
defaultResult, bypassable) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row row to check
|
||||
* @param family column family
|
||||
* @param qualifier column qualifier
|
||||
* @param op the comparison operation
|
||||
* @param comparator the comparator
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean postCheckAndDelete(final byte [] row, final byte [] family,
|
||||
final byte [] qualifier, final CompareOperator op,
|
||||
final ByteArrayComparable comparator, final Delete delete,
|
||||
boolean result) throws IOException {
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return result;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.postCheckAndDelete(this, row, family,
|
||||
qualifier, op, comparator, delete, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row row to check
|
||||
* @param filter filter
|
||||
* @param delete delete to commit if check succeeds
|
||||
* @throws IOException e
|
||||
*/
|
||||
public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete,
|
||||
boolean result) throws IOException {
|
||||
if (this.coprocEnvironments.isEmpty()) {
|
||||
return result;
|
||||
}
|
||||
return execOperationWithResult(
|
||||
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
|
||||
@Override
|
||||
public Boolean call(RegionObserver observer) throws IOException {
|
||||
return observer.postCheckAndDelete(this, row, filter, delete, getResult());
|
||||
public CheckAndMutateResult call(RegionObserver observer) throws IOException {
|
||||
return observer.postCheckAndMutate(this, checkAndMutate, getResult());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.CompareOperator;
|
|||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -113,6 +115,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndMutate = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreCheckAndMutateAfterRowLock = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostCheckAndMutate = new AtomicInteger(0);
|
||||
final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
|
||||
final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0);
|
||||
|
@ -583,6 +588,28 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateResult preCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
|
||||
ctPreCheckAndMutate.incrementAndGet();
|
||||
return RegionObserver.super.preCheckAndMutate(c, checkAndMutate, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateResult preCheckAndMutateAfterRowLock(
|
||||
ObserverContext<RegionCoprocessorEnvironment> c, CheckAndMutate checkAndMutate,
|
||||
CheckAndMutateResult result) throws IOException {
|
||||
ctPreCheckAndMutateAfterRowLock.incrementAndGet();
|
||||
return RegionObserver.super.preCheckAndMutateAfterRowLock(c, checkAndMutate, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckAndMutateResult postCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
|
||||
ctPostCheckAndMutate.incrementAndGet();
|
||||
return RegionObserver.super.postCheckAndMutate(c, checkAndMutate, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
Append append) throws IOException {
|
||||
|
@ -826,6 +853,18 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return ctPostCheckAndDeleteWithFilter.get();
|
||||
}
|
||||
|
||||
public int getPreCheckAndMutate() {
|
||||
return ctPreCheckAndMutate.get();
|
||||
}
|
||||
|
||||
public int getPreCheckAndMutateAfterRowLock() {
|
||||
return ctPreCheckAndMutateAfterRowLock.get();
|
||||
}
|
||||
|
||||
public int getPostCheckAndMutate() {
|
||||
return ctPostCheckAndMutate.get();
|
||||
}
|
||||
|
||||
public boolean hadPreIncrement() {
|
||||
return ctPreIncrement.get() > 0;
|
||||
}
|
||||
|
|
|
@ -45,11 +45,13 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
|
@ -267,15 +269,17 @@ public class TestRegionObserverInterface {
|
|||
verifyMethodResult(SimpleRegionObserver.class,
|
||||
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
|
||||
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
|
||||
"getPostCheckAndPutWithFilter" },
|
||||
tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
|
||||
"getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
|
||||
|
||||
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
|
||||
verifyMethodResult(SimpleRegionObserver.class,
|
||||
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
|
||||
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
|
||||
"getPostCheckAndPutWithFilter" },
|
||||
tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
|
||||
"getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
|
||||
|
||||
table.checkAndMutate(Bytes.toBytes(0),
|
||||
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
|
||||
|
@ -283,8 +287,9 @@ public class TestRegionObserverInterface {
|
|||
verifyMethodResult(SimpleRegionObserver.class,
|
||||
new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
|
||||
"getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
|
||||
"getPostCheckAndPutWithFilter" },
|
||||
tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
|
||||
"getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
|
||||
} finally {
|
||||
util.deleteTable(tableName);
|
||||
}
|
||||
|
@ -304,16 +309,18 @@ public class TestRegionObserverInterface {
|
|||
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
|
||||
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
|
||||
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
|
||||
"getPostCheckAndDeleteWithFilter" },
|
||||
tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
|
||||
"getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
|
||||
|
||||
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
|
||||
verifyMethodResult(
|
||||
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
|
||||
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
|
||||
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
|
||||
"getPostCheckAndDeleteWithFilter" },
|
||||
tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
|
||||
"getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
|
||||
|
||||
table.checkAndMutate(Bytes.toBytes(0),
|
||||
new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
|
||||
|
@ -322,8 +329,50 @@ public class TestRegionObserverInterface {
|
|||
SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
|
||||
"getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
|
||||
"getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
|
||||
"getPostCheckAndDeleteWithFilter" },
|
||||
tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
|
||||
"getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
|
||||
} finally {
|
||||
util.deleteTable(tableName);
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithRowMutationsHooks() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." +
|
||||
name.getMethodName());
|
||||
Table table = util.createTable(tableName, new byte[][] { A, B, C });
|
||||
try {
|
||||
byte[] row = Bytes.toBytes(0);
|
||||
|
||||
Put p = new Put(row).addColumn(A, A, A);
|
||||
table.put(p);
|
||||
verifyMethodResult(
|
||||
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 0, 0, 0 });
|
||||
|
||||
table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifEquals(A, A, A)
|
||||
.build(new RowMutations(row)
|
||||
.add((Mutation) new Put(row).addColumn(B, B, B))
|
||||
.add((Mutation) new Delete(row))));
|
||||
verifyMethodResult(
|
||||
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 1, 1, 1 });
|
||||
|
||||
Object[] result = new Object[2];
|
||||
table.batch(Arrays.asList(p, CheckAndMutate.newBuilder(row)
|
||||
.ifEquals(A, A, A)
|
||||
.build(new RowMutations(row)
|
||||
.add((Mutation) new Put(row).addColumn(B, B, B))
|
||||
.add((Mutation) new Delete(row)))), result);
|
||||
verifyMethodResult(
|
||||
SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
|
||||
"getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
|
||||
tableName, new Integer[] { 2, 2, 2 });
|
||||
} finally {
|
||||
util.deleteTable(tableName);
|
||||
table.close();
|
||||
|
|
|
@ -99,6 +99,8 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutate;
|
||||
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
|
@ -1775,6 +1777,7 @@ public class TestHRegion {
|
|||
// checkAndMutate tests
|
||||
// ////////////////////////////////////////////////////////////////////////////
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -1844,6 +1847,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndMutate_WithWrongValue() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -1893,6 +1897,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndMutate_WithCorrectValue() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -1941,6 +1946,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -2030,6 +2036,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndPut_ThatPutWasWritten() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -2071,6 +2078,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndPut_wrongRowInPut() throws IOException {
|
||||
this.region = initHRegion(tableName, method, CONF, COLUMNS);
|
||||
Put put = new Put(row2);
|
||||
|
@ -2085,6 +2093,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
|
@ -2158,6 +2167,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndMutate_WithFilters() throws Throwable {
|
||||
final byte[] FAMILY = Bytes.toBytes("fam");
|
||||
|
||||
|
@ -2232,6 +2242,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
|
||||
final byte[] FAMILY = Bytes.toBytes("fam");
|
||||
|
||||
|
@ -2280,6 +2291,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndMutate_wrongMutationType() throws Throwable {
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
|
@ -2289,7 +2301,7 @@ public class TestHRegion {
|
|||
new Increment(row).addColumn(fam1, qual1, 1));
|
||||
fail("should throw DoNotRetryIOException");
|
||||
} catch (DoNotRetryIOException e) {
|
||||
assertEquals("Action must be Put or Delete", e.getMessage());
|
||||
assertEquals("Unsupported mutate type: INCREMENT", e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -2298,11 +2310,12 @@ public class TestHRegion {
|
|||
new Increment(row).addColumn(fam1, qual1, 1));
|
||||
fail("should throw DoNotRetryIOException");
|
||||
} catch (DoNotRetryIOException e) {
|
||||
assertEquals("Action must be Put or Delete", e.getMessage());
|
||||
assertEquals("Unsupported mutate type: INCREMENT", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
public void testCheckAndMutate_wrongRow() throws Throwable {
|
||||
final byte[] wrongRow = Bytes.toBytes("wrongRow");
|
||||
|
||||
|
@ -2314,7 +2327,8 @@ public class TestHRegion {
|
|||
new Put(wrongRow).addColumn(fam1, qual1, value1));
|
||||
fail("should throw DoNotRetryIOException");
|
||||
} catch (DoNotRetryIOException e) {
|
||||
assertEquals("Action's getRow must match", e.getMessage());
|
||||
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
|
||||
+ "match the original one <rowA>", e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -2323,7 +2337,8 @@ public class TestHRegion {
|
|||
new Put(wrongRow).addColumn(fam1, qual1, value1));
|
||||
fail("should throw DoNotRetryIOException");
|
||||
} catch (DoNotRetryIOException e) {
|
||||
assertEquals("Action's getRow must match", e.getMessage());
|
||||
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
|
||||
+ "match the original one <rowA>", e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -2335,7 +2350,8 @@ public class TestHRegion {
|
|||
.add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
|
||||
fail("should throw DoNotRetryIOException");
|
||||
} catch (DoNotRetryIOException e) {
|
||||
assertEquals("Action's getRow must match", e.getMessage());
|
||||
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
|
||||
+ "match the original one <rowA>", e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -2347,10 +2363,494 @@ public class TestHRegion {
|
|||
.add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
|
||||
fail("should throw DoNotRetryIOException");
|
||||
} catch (DoNotRetryIOException e) {
|
||||
assertEquals("Action's getRow must match", e.getMessage());
|
||||
assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
|
||||
+ "match the original one <rowA>", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithEmptyRowValue() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
byte[] qf1 = Bytes.toBytes("qualifier");
|
||||
byte[] emptyVal = new byte[] {};
|
||||
byte[] val1 = Bytes.toBytes("value1");
|
||||
byte[] val2 = Bytes.toBytes("value2");
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
// Putting empty data in key
|
||||
Put put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, emptyVal);
|
||||
|
||||
// checkAndPut with empty value
|
||||
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// Putting data in key
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val1);
|
||||
|
||||
// checkAndPut with correct value
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// not empty anymore
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
Delete delete = new Delete(row1);
|
||||
delete.addColumn(fam1, qf1);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val2);
|
||||
// checkAndPut with correct value
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// checkAndDelete with correct value
|
||||
delete = new Delete(row1);
|
||||
delete.addColumn(fam1, qf1);
|
||||
delete.addColumn(fam1, qf1);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
delete = new Delete(row1);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// checkAndPut looking for a null value
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val1);
|
||||
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1)
|
||||
.build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithWrongValue() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
byte[] qf1 = Bytes.toBytes("qualifier");
|
||||
byte[] val1 = Bytes.toBytes("value1");
|
||||
byte[] val2 = Bytes.toBytes("value2");
|
||||
BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
|
||||
BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
// Putting data in key
|
||||
Put put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val1);
|
||||
region.put(put);
|
||||
|
||||
// checkAndPut with wrong value
|
||||
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// checkAndDelete with wrong value
|
||||
Delete delete = new Delete(row1);
|
||||
delete.addFamily(fam1);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// Putting data in key
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
|
||||
region.put(put);
|
||||
|
||||
// checkAndPut with wrong value
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// checkAndDelete with wrong value
|
||||
delete = new Delete(row1);
|
||||
delete.addFamily(fam1);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(delete));
|
||||
assertFalse(res.isSuccess());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithCorrectValue() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
byte[] qf1 = Bytes.toBytes("qualifier");
|
||||
byte[] val1 = Bytes.toBytes("value1");
|
||||
BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
// Putting data in key
|
||||
long now = System.currentTimeMillis();
|
||||
Put put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, now, val1);
|
||||
region.put(put);
|
||||
|
||||
// checkAndPut with correct value
|
||||
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
|
||||
assertTrue("First", res.isSuccess());
|
||||
|
||||
// checkAndDelete with correct value
|
||||
Delete delete = new Delete(row1, now + 1);
|
||||
delete.addColumn(fam1, qf1);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
|
||||
assertTrue("Delete", res.isSuccess());
|
||||
|
||||
// Putting data in key
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, now + 2, Bytes.toBytes(bd1));
|
||||
region.put(put);
|
||||
|
||||
// checkAndPut with correct value
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(put));
|
||||
assertTrue("Second put", res.isSuccess());
|
||||
|
||||
// checkAndDelete with correct value
|
||||
delete = new Delete(row1, now + 3);
|
||||
delete.addColumn(fam1, qf1);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(delete));
|
||||
assertTrue("Second delete", res.isSuccess());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithNonEqualCompareOp() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
byte[] qf1 = Bytes.toBytes("qualifier");
|
||||
byte[] val1 = Bytes.toBytes("value1");
|
||||
byte[] val2 = Bytes.toBytes("value2");
|
||||
byte[] val3 = Bytes.toBytes("value3");
|
||||
byte[] val4 = Bytes.toBytes("value4");
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
// Putting val3 in key
|
||||
Put put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val3);
|
||||
region.put(put);
|
||||
|
||||
// Test CompareOp.LESS: original = val3, compare with val3, fail
|
||||
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// Test CompareOp.LESS: original = val3, compare with val4, fail
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// Test CompareOp.LESS: original = val3, compare with val2,
|
||||
// succeed (now value = val2)
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val2);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
|
||||
// succeed (value still = val2)
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
|
||||
// succeed (now value = val3)
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val3);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// Test CompareOp.GREATER: original = val3, compare with val3, fail
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// Test CompareOp.GREATER: original = val3, compare with val2, fail
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// Test CompareOp.GREATER: original = val3, compare with val4,
|
||||
// succeed (now value = val2)
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val2);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val1).build(put));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
|
||||
// succeed (value still = val2)
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val2).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val3).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndPutThatPutWasWritten() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
byte[] fam2 = Bytes.toBytes("fam2");
|
||||
byte[] qf1 = Bytes.toBytes("qualifier");
|
||||
byte[] val1 = Bytes.toBytes("value1");
|
||||
byte[] val2 = Bytes.toBytes("value2");
|
||||
|
||||
byte[][] families = { fam1, fam2 };
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
// Putting data in the key to check
|
||||
Put put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val1);
|
||||
region.put(put);
|
||||
|
||||
// Creating put to add
|
||||
long ts = System.currentTimeMillis();
|
||||
KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
|
||||
put = new Put(row1);
|
||||
put.add(kv);
|
||||
|
||||
// checkAndPut with wrong value
|
||||
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
Get get = new Get(row1);
|
||||
get.addColumn(fam2, qf1);
|
||||
Cell[] actual = region.get(get).rawCells();
|
||||
|
||||
Cell[] expected = { kv };
|
||||
|
||||
assertEquals(expected.length, actual.length);
|
||||
for (int i = 0; i < actual.length; i++) {
|
||||
assertEquals(expected[i], actual[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndDeleteThatDeleteWasWritten() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
byte[] fam2 = Bytes.toBytes("fam2");
|
||||
byte[] qf1 = Bytes.toBytes("qualifier1");
|
||||
byte[] qf2 = Bytes.toBytes("qualifier2");
|
||||
byte[] qf3 = Bytes.toBytes("qualifier3");
|
||||
byte[] val1 = Bytes.toBytes("value1");
|
||||
byte[] val2 = Bytes.toBytes("value2");
|
||||
byte[] val3 = Bytes.toBytes("value3");
|
||||
byte[] emptyVal = new byte[] {};
|
||||
|
||||
byte[][] families = { fam1, fam2 };
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
// Put content
|
||||
Put put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val1);
|
||||
region.put(put);
|
||||
Threads.sleep(2);
|
||||
|
||||
put = new Put(row1);
|
||||
put.addColumn(fam1, qf1, val2);
|
||||
put.addColumn(fam2, qf1, val3);
|
||||
put.addColumn(fam2, qf2, val2);
|
||||
put.addColumn(fam2, qf3, val1);
|
||||
put.addColumn(fam1, qf3, val1);
|
||||
region.put(put);
|
||||
|
||||
LOG.info("get={}", region.get(new Get(row1).addColumn(fam1, qf1)).toString());
|
||||
|
||||
// Multi-column delete
|
||||
Delete delete = new Delete(row1);
|
||||
delete.addColumn(fam1, qf1);
|
||||
delete.addColumn(fam2, qf1);
|
||||
delete.addColumn(fam1, qf3);
|
||||
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
Get get = new Get(row1);
|
||||
get.addColumn(fam1, qf1);
|
||||
get.addColumn(fam1, qf3);
|
||||
get.addColumn(fam2, qf2);
|
||||
Result r = region.get(get);
|
||||
assertEquals(2, r.size());
|
||||
assertArrayEquals(val1, r.getValue(fam1, qf1));
|
||||
assertArrayEquals(val2, r.getValue(fam2, qf2));
|
||||
|
||||
// Family delete
|
||||
delete = new Delete(row1);
|
||||
delete.addFamily(fam2);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam2, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
get = new Get(row1);
|
||||
r = region.get(get);
|
||||
assertEquals(1, r.size());
|
||||
assertArrayEquals(val1, r.getValue(fam1, qf1));
|
||||
|
||||
// Row delete
|
||||
delete = new Delete(row1);
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
|
||||
.ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
|
||||
assertTrue(res.isSuccess());
|
||||
get = new Get(row1);
|
||||
r = region.get(get);
|
||||
assertEquals(0, r.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithFilters() throws Throwable {
|
||||
final byte[] FAMILY = Bytes.toBytes("fam");
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, FAMILY);
|
||||
|
||||
// Put one row
|
||||
Put put = new Put(row);
|
||||
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
|
||||
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
|
||||
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
|
||||
region.put(put);
|
||||
|
||||
// Put with success
|
||||
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))))
|
||||
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
// Put with failure
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("c"))))
|
||||
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
|
||||
|
||||
// Delete with success
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))))
|
||||
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
|
||||
|
||||
// Mutate with success
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(new FilterList(
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")),
|
||||
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("b"))))
|
||||
.build(new RowMutations(row)
|
||||
.add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
|
||||
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
|
||||
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithFiltersAndTimeRange() throws Throwable {
|
||||
final byte[] FAMILY = Bytes.toBytes("fam");
|
||||
|
||||
// Setting up region
|
||||
this.region = initHRegion(tableName, method, CONF, FAMILY);
|
||||
|
||||
// Put with specifying the timestamp
|
||||
region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
|
||||
|
||||
// Put with success
|
||||
CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")))
|
||||
.timeRange(TimeRange.between(0, 101))
|
||||
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
|
||||
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
|
||||
|
||||
// Put with failure
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")))
|
||||
.timeRange(TimeRange.between(0, 100))
|
||||
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
|
||||
assertFalse(res.isSuccess());
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
|
||||
|
||||
// Mutate with success
|
||||
res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
|
||||
Bytes.toBytes("a")))
|
||||
.timeRange(TimeRange.between(0, 101))
|
||||
.build(new RowMutations(row)
|
||||
.add((Mutation) new Put(row)
|
||||
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
|
||||
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
|
||||
assertTrue(res.isSuccess());
|
||||
|
||||
result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
|
||||
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
|
||||
|
||||
assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
|
||||
}
|
||||
|
||||
// ////////////////////////////////////////////////////////////////////////////
|
||||
// Delete tests
|
||||
// ////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -151,6 +151,7 @@ public class TestMetricsRegionServer {
|
|||
rsm.updateDelete(null, 17);
|
||||
rsm.updateCheckAndDelete(17);
|
||||
rsm.updateCheckAndPut(17);
|
||||
rsm.updateCheckAndMutate(17);
|
||||
}
|
||||
|
||||
HELPER.assertCounter("appendNumOps", 24, serverSource);
|
||||
|
@ -162,7 +163,7 @@ public class TestMetricsRegionServer {
|
|||
HELPER.assertCounter("deleteNumOps", 17, serverSource);
|
||||
HELPER.assertCounter("checkAndDeleteNumOps", 17, serverSource);
|
||||
HELPER.assertCounter("checkAndPutNumOps", 17, serverSource);
|
||||
|
||||
HELPER.assertCounter("checkAndMutateNumOps", 17, serverSource);
|
||||
|
||||
HELPER.assertCounter("slowAppendCount", 12, serverSource);
|
||||
HELPER.assertCounter("slowDeleteCount", 13, serverSource);
|
||||
|
|
Loading…
Reference in New Issue