HBASE-25678 Support nonce operations for Increment/Append in RowMutations and CheckAndMutate (#3073)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
37fa9e5e97
commit
2b5c2c36d9
|
@ -172,7 +172,7 @@ class AsyncBatchRpcRetryingCaller<T> {
|
|||
} else {
|
||||
action = new Action(rawAction, i);
|
||||
}
|
||||
if (rawAction instanceof Append || rawAction instanceof Increment) {
|
||||
if (hasIncrementOrAppend(rawAction)) {
|
||||
action.setNonce(conn.getNonceGenerator().newNonce());
|
||||
}
|
||||
this.actions.add(action);
|
||||
|
@ -184,6 +184,26 @@ class AsyncBatchRpcRetryingCaller<T> {
|
|||
this.startNs = System.nanoTime();
|
||||
}
|
||||
|
||||
private static boolean hasIncrementOrAppend(Row action) {
|
||||
if (action instanceof Append || action instanceof Increment) {
|
||||
return true;
|
||||
} else if (action instanceof RowMutations) {
|
||||
return hasIncrementOrAppend((RowMutations) action);
|
||||
} else if (action instanceof CheckAndMutate) {
|
||||
return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean hasIncrementOrAppend(RowMutations mutations) {
|
||||
for (Mutation mutation : mutations.getMutations()) {
|
||||
if (mutation instanceof Append || mutation instanceof Increment) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private long remainingTimeNs() {
|
||||
return operationTimeoutNs - (System.nanoTime() - startNs);
|
||||
}
|
||||
|
|
|
@ -398,9 +398,30 @@ class AsyncProcess {
|
|||
}
|
||||
|
||||
private void setNonce(NonceGenerator ng, Row r, Action action) {
|
||||
if (!(r instanceof Append) && !(r instanceof Increment)) return;
|
||||
if (hasIncrementOrAppend(r)) {
|
||||
action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean hasIncrementOrAppend(Row action) {
|
||||
if (action instanceof Append || action instanceof Increment) {
|
||||
return true;
|
||||
} else if (action instanceof RowMutations) {
|
||||
return hasIncrementOrAppend((RowMutations) action);
|
||||
} else if (action instanceof CheckAndMutate) {
|
||||
return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean hasIncrementOrAppend(RowMutations mutations) {
|
||||
for (Mutation mutation : mutations.getMutations()) {
|
||||
if (mutation instanceof Append || mutation instanceof Increment) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private int checkTimeout(String name, int timeout) {
|
||||
if (timeout < 0) {
|
||||
|
|
|
@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
@ -555,17 +554,16 @@ public class HTable implements Table {
|
|||
|
||||
@Override
|
||||
public Result mutateRow(final RowMutations rm) throws IOException {
|
||||
long nonceGroup = getNonceGroup();
|
||||
long nonce = getNonce();
|
||||
CancellableRegionServerCallable<MultiResponse> callable =
|
||||
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
|
||||
rpcControllerFactory.newController(), writeRpcTimeoutMs,
|
||||
new RetryingTimeTracker().start(), rm.getMaxPriority()) {
|
||||
@Override
|
||||
protected MultiResponse rpcCall() throws Exception {
|
||||
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
|
||||
getLocation().getRegionInfo().getRegionName(), rm);
|
||||
regionMutationBuilder.setAtomic(true);
|
||||
MultiRequest request =
|
||||
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
|
||||
MultiRequest request = RequestConverter.buildMultiRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), rm, nonceGroup, nonce);
|
||||
ClientProtos.MultiResponse response = doMulti(request);
|
||||
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
||||
if (res.hasException()) {
|
||||
|
@ -597,6 +595,14 @@ public class HTable implements Table {
|
|||
return (Result) results[0];
|
||||
}
|
||||
|
||||
private long getNonceGroup() {
|
||||
return ((ClusterConnection) getConnection()).getNonceGenerator().getNonceGroup();
|
||||
}
|
||||
|
||||
private long getNonce() {
|
||||
return ((ClusterConnection) getConnection()).getNonceGenerator().newNonce();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result append(final Append append) throws IOException {
|
||||
checkHasFamilies(append);
|
||||
|
@ -606,7 +612,8 @@ public class HTable implements Table {
|
|||
@Override
|
||||
protected Result rpcCall() throws Exception {
|
||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
|
||||
getLocation().getRegionInfo().getRegionName(), append, super.getNonceGroup(),
|
||||
super.getNonce());
|
||||
MutateResponse response = doMutate(request);
|
||||
if (!response.hasResult()) return null;
|
||||
return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
|
||||
|
@ -625,7 +632,8 @@ public class HTable implements Table {
|
|||
@Override
|
||||
protected Result rpcCall() throws Exception {
|
||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
|
||||
getLocation().getRegionInfo().getRegionName(), increment, super.getNonceGroup(),
|
||||
super.getNonce());
|
||||
MutateResponse response = doMutate(request);
|
||||
// Should this check for null like append does?
|
||||
return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
|
||||
|
@ -664,7 +672,7 @@ public class HTable implements Table {
|
|||
protected Long rpcCall() throws Exception {
|
||||
MutateRequest request = RequestConverter.buildIncrementRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), row, family,
|
||||
qualifier, amount, durability, getNonceGroup(), getNonce());
|
||||
qualifier, amount, durability, super.getNonceGroup(), super.getNonce());
|
||||
MutateResponse response = doMutate(request);
|
||||
Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
|
||||
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
|
||||
|
@ -737,6 +745,8 @@ public class HTable implements Table {
|
|||
private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
|
||||
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
|
||||
final TimeRange timeRange, final RowMutations rm) throws IOException {
|
||||
long nonceGroup = getNonceGroup();
|
||||
long nonce = getNonce();
|
||||
CancellableRegionServerCallable<MultiResponse> callable =
|
||||
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
|
||||
rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
|
||||
|
@ -744,8 +754,8 @@ public class HTable implements Table {
|
|||
@Override
|
||||
protected MultiResponse rpcCall() throws Exception {
|
||||
MultiRequest request = RequestConverter
|
||||
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
|
||||
qualifier, op, value, filter, timeRange, rm);
|
||||
.buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row, family,
|
||||
qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce);
|
||||
ClientProtos.MultiResponse response = doMulti(request);
|
||||
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
||||
if (res.hasException()) {
|
||||
|
@ -822,6 +832,8 @@ public class HTable implements Table {
|
|||
private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
|
||||
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
|
||||
final TimeRange timeRange, final Mutation mutation) throws IOException {
|
||||
long nonceGroup = getNonceGroup();
|
||||
long nonce = getNonce();
|
||||
ClientServiceCallable<CheckAndMutateResult> callable =
|
||||
new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
|
||||
this.rpcControllerFactory.newController(), mutation.getPriority()) {
|
||||
|
@ -829,7 +841,7 @@ public class HTable implements Table {
|
|||
protected CheckAndMutateResult rpcCall() throws Exception {
|
||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
|
||||
filter, timeRange, mutation);
|
||||
filter, timeRange, mutation, nonceGroup, nonce);
|
||||
MutateResponse response = doMutate(request);
|
||||
if (response.hasResult()) {
|
||||
return new CheckAndMutateResult(response.getProcessed(),
|
||||
|
|
|
@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
|
||||
|
||||
/**
|
||||
* The implementation of RawAsyncTable.
|
||||
|
@ -362,7 +361,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
|
||||
stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
|
||||
null, timeRange, p),
|
||||
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
@ -374,7 +373,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
|
||||
loc, stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
|
||||
null, timeRange, d),
|
||||
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
@ -387,8 +386,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
|
||||
loc, stub, mutation,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
|
||||
null, timeRange, rm), CheckAndMutateResult::isSuccess))
|
||||
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
|
||||
null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
CheckAndMutateResult::isSuccess))
|
||||
.call();
|
||||
}
|
||||
}
|
||||
|
@ -425,7 +425,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
|
||||
stub, put,
|
||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
|
||||
filter, timeRange, p),
|
||||
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
@ -436,7 +436,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
|
||||
loc, stub, delete,
|
||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
|
||||
filter, timeRange, d),
|
||||
filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
(c, r) -> r.getProcessed()))
|
||||
.call();
|
||||
}
|
||||
|
@ -448,8 +448,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
|
||||
loc, stub, mutation,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
|
||||
filter, timeRange, rm), CheckAndMutateResult::isSuccess))
|
||||
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null,
|
||||
filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
|
||||
CheckAndMutateResult::isSuccess))
|
||||
.call();
|
||||
}
|
||||
}
|
||||
|
@ -468,6 +469,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
if (mutation instanceof Put) {
|
||||
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
|
||||
}
|
||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
|
||||
mutation.getPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
|
||||
|
@ -475,21 +478,23 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
|
||||
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
|
||||
checkAndMutate.getTimeRange(), m),
|
||||
checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
|
||||
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
|
||||
.call();
|
||||
} else if (checkAndMutate.getAction() instanceof RowMutations) {
|
||||
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
|
||||
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
|
||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
|
||||
rowMutations.getMaxPriority(), rpcTimeoutNs)
|
||||
.action((controller, loc, stub) ->
|
||||
RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(
|
||||
controller, loc, stub, rowMutations,
|
||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
|
||||
(rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
|
||||
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
|
||||
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
|
||||
checkAndMutate.getTimeRange(), rm),
|
||||
checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
|
||||
resp -> resp))
|
||||
.call();
|
||||
} else {
|
||||
|
@ -554,16 +559,13 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
|||
@Override
|
||||
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
|
||||
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
|
||||
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
|
||||
long nonce = conn.getNonceGenerator().newNonce();
|
||||
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
|
||||
writeRpcTimeoutNs).action((controller, loc, stub) ->
|
||||
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
|
||||
(rn, rm) -> {
|
||||
RegionAction.Builder regionMutationBuilder = RequestConverter
|
||||
.buildRegionAction(rn, rm);
|
||||
regionMutationBuilder.setAtomic(true);
|
||||
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
|
||||
.build();
|
||||
}, resp -> resp))
|
||||
(rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
|
||||
resp -> resp))
|
||||
.call();
|
||||
}
|
||||
|
||||
|
|
|
@ -247,39 +247,78 @@ public final class RequestConverter {
|
|||
*/
|
||||
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
|
||||
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
|
||||
final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
|
||||
return MutateRequest.newBuilder()
|
||||
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
|
||||
.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
|
||||
final Filter filter, final TimeRange timeRange, final Mutation mutation, long nonceGroup,
|
||||
long nonce) throws IOException {
|
||||
MutateRequest.Builder builder = MutateRequest.newBuilder();
|
||||
if (mutation instanceof Increment || mutation instanceof Append) {
|
||||
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation, nonce))
|
||||
.setNonceGroup(nonceGroup);
|
||||
} else {
|
||||
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation));
|
||||
}
|
||||
return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
|
||||
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer MutateRequest for conditioned row mutations
|
||||
* Create a protocol buffer MultiRequest for conditioned row mutations
|
||||
*
|
||||
* @return a mutate request
|
||||
* @return a multi request
|
||||
* @throws IOException
|
||||
*/
|
||||
public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
|
||||
public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
|
||||
final byte[] row, final byte[] family, final byte[] qualifier,
|
||||
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
|
||||
final RowMutations rowMutations) throws IOException {
|
||||
final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
|
||||
return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op,
|
||||
value, filter, timeRange), nonceGroup, nonce);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer MultiRequest for row mutations
|
||||
*
|
||||
* @return a multi request
|
||||
*/
|
||||
public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
|
||||
final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
|
||||
return buildMultiRequest(regionName, rowMutations, null, nonceGroup, nonce);
|
||||
}
|
||||
|
||||
private static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
|
||||
final RowMutations rowMutations, final Condition condition, long nonceGroup, long nonce)
|
||||
throws IOException {
|
||||
RegionAction.Builder builder =
|
||||
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
|
||||
builder.setAtomic(true);
|
||||
|
||||
boolean hasNonce = false;
|
||||
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
||||
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
|
||||
for (Mutation mutation: rowMutations.getMutations()) {
|
||||
mutationBuilder.clear();
|
||||
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
|
||||
mutationBuilder);
|
||||
MutationProto mp;
|
||||
if (mutation instanceof Increment || mutation instanceof Append) {
|
||||
mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder, nonce);
|
||||
hasNonce = true;
|
||||
} else {
|
||||
mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder);
|
||||
}
|
||||
actionBuilder.clear();
|
||||
actionBuilder.setMutation(mp);
|
||||
builder.addAction(actionBuilder.build());
|
||||
}
|
||||
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition(
|
||||
buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build();
|
||||
|
||||
if (condition != null) {
|
||||
builder.setCondition(condition);
|
||||
}
|
||||
|
||||
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
|
||||
if (hasNonce) {
|
||||
multiRequestBuilder.setNonceGroup(nonceGroup);
|
||||
}
|
||||
|
||||
return multiRequestBuilder.addRegionAction(builder.build()).build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -362,33 +401,6 @@ public final class RequestConverter {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer MultiRequest for row mutations.
|
||||
* Does not propagate Action absolute position. Does not set atomic action on the created
|
||||
* RegionAtomic. Caller should do that if wanted.
|
||||
* @param regionName
|
||||
* @param rowMutations
|
||||
* @return a data-laden RegionMutation.Builder
|
||||
* @throws IOException
|
||||
*/
|
||||
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
|
||||
final RowMutations rowMutations)
|
||||
throws IOException {
|
||||
RegionAction.Builder builder =
|
||||
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
|
||||
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
||||
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
|
||||
for (Mutation mutation: rowMutations.getMutations()) {
|
||||
mutationBuilder.clear();
|
||||
MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
|
||||
mutationBuilder);
|
||||
actionBuilder.clear();
|
||||
actionBuilder.setMutation(mp);
|
||||
builder.addAction(actionBuilder.build());
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static RegionAction.Builder getRegionActionBuilderWithRegion(
|
||||
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
|
||||
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
|
||||
|
@ -785,9 +797,6 @@ public final class RequestConverter {
|
|||
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
|
||||
}
|
||||
}
|
||||
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
|
||||
multiRequestBuilder.setNonceGroup(nonceGroup);
|
||||
}
|
||||
if (builder.getActionCount() > 0) {
|
||||
multiRequestBuilder.addRegionAction(builder.build());
|
||||
}
|
||||
|
@ -801,8 +810,11 @@ public final class RequestConverter {
|
|||
builder.clear();
|
||||
getRegionActionBuilderWithRegion(builder, regionName);
|
||||
|
||||
buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder,
|
||||
mutationBuilder);
|
||||
boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) action.getAction(),
|
||||
cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
|
||||
if (hasIncrementOrAppend) {
|
||||
hasNonce = true;
|
||||
}
|
||||
builder.setAtomic(true);
|
||||
|
||||
multiRequestBuilder.addRegionAction(builder.build());
|
||||
|
@ -836,16 +848,21 @@ public final class RequestConverter {
|
|||
} else if (cam.getAction() instanceof Increment) {
|
||||
actionBuilder.clear();
|
||||
mutationBuilder.clear();
|
||||
buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
|
||||
buildNoDataRegionAction((Increment) cam.getAction(), cells, action.getNonce(), builder,
|
||||
actionBuilder, mutationBuilder);
|
||||
hasNonce = true;
|
||||
} else if (cam.getAction() instanceof Append) {
|
||||
actionBuilder.clear();
|
||||
mutationBuilder.clear();
|
||||
buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
|
||||
buildNoDataRegionAction((Append) cam.getAction(), cells, action.getNonce(), builder,
|
||||
actionBuilder, mutationBuilder);
|
||||
hasNonce = true;
|
||||
} else if (cam.getAction() instanceof RowMutations) {
|
||||
buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
|
||||
mutationBuilder);
|
||||
boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) cam.getAction(),
|
||||
cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
|
||||
if (hasIncrementOrAppend) {
|
||||
hasNonce = true;
|
||||
}
|
||||
builder.setAtomic(true);
|
||||
} else {
|
||||
throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
|
||||
|
@ -858,6 +875,10 @@ public final class RequestConverter {
|
|||
// in the overall multiRequest.
|
||||
indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
|
||||
}
|
||||
|
||||
if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
|
||||
multiRequestBuilder.setNonceGroup(nonceGroup);
|
||||
}
|
||||
}
|
||||
|
||||
private static void buildNoDataRegionAction(final Put put, final List<CellScannable> cells,
|
||||
|
@ -907,18 +928,29 @@ public final class RequestConverter {
|
|||
MutationType.APPEND, append, mutationBuilder, nonce)));
|
||||
}
|
||||
|
||||
private static void buildNoDataRegionAction(final RowMutations rowMutations,
|
||||
final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
|
||||
/**
|
||||
* @return whether or not the rowMutations has a Increment or Append
|
||||
*/
|
||||
private static boolean buildNoDataRegionAction(final RowMutations rowMutations,
|
||||
final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
|
||||
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
|
||||
throws IOException {
|
||||
boolean ret = false;
|
||||
for (Mutation mutation: rowMutations.getMutations()) {
|
||||
mutationBuilder.clear();
|
||||
MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
|
||||
mutationBuilder);
|
||||
MutationProto mp;
|
||||
if (mutation instanceof Increment || mutation instanceof Append) {
|
||||
mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder,
|
||||
nonce);
|
||||
ret = true;
|
||||
} else {
|
||||
mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder);
|
||||
}
|
||||
cells.add(mutation);
|
||||
actionBuilder.clear();
|
||||
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private static MutationType getMutationType(Mutation mutation) {
|
||||
|
|
|
@ -1551,7 +1551,7 @@ public interface RegionObserver {
|
|||
List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
|
||||
for (Pair<Cell, Cell> pair : cellPairs) {
|
||||
resultPairs.add(new Pair<>(pair.getFirst(),
|
||||
postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
|
||||
postMutationBeforeWAL(ctx, MutationType.APPEND, mutation, pair.getFirst(),
|
||||
pair.getSecond())));
|
||||
}
|
||||
return resultPairs;
|
||||
|
|
|
@ -3259,8 +3259,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
protected final Map<byte[], List<Cell>>[] familyCellMaps;
|
||||
// For Increment/Append operations
|
||||
protected final Result[] results;
|
||||
// For nonce operations
|
||||
protected final boolean[] canProceed;
|
||||
|
||||
protected final HRegion region;
|
||||
protected int nextIndexToProcess = 0;
|
||||
|
@ -3276,7 +3274,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
this.walEditsFromCoprocessors = new WALEdit[operations.length];
|
||||
familyCellMaps = new Map[operations.length];
|
||||
this.results = new Result[operations.length];
|
||||
this.canProceed = new boolean[operations.length];
|
||||
|
||||
this.region = region;
|
||||
observedExceptions = new ObservedExceptionsInBatch();
|
||||
|
@ -3698,9 +3695,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
*/
|
||||
private static class MutationBatchOperation extends BatchOperation<Mutation> {
|
||||
|
||||
// For nonce operations
|
||||
private long nonceGroup;
|
||||
|
||||
private long nonce;
|
||||
protected boolean canProceed;
|
||||
|
||||
public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
|
||||
long nonceGroup, long nonce) {
|
||||
|
@ -3813,6 +3811,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
@Override
|
||||
public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
|
||||
long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
|
||||
// For nonce operations
|
||||
canProceed = startNonceOperation();
|
||||
|
||||
visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
|
||||
Mutation mutation = getMutation(index);
|
||||
if (mutation instanceof Put) {
|
||||
|
@ -3831,8 +3832,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
// For nonce operations
|
||||
canProceed[index] = startNonceOperation(nonceGroup, nonce);
|
||||
if (!canProceed[index]) {
|
||||
if (!canProceed) {
|
||||
Result result;
|
||||
if (returnResults) {
|
||||
// convert duplicate increment/append to get
|
||||
|
@ -3894,11 +3894,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
/**
|
||||
* Starts the nonce operation for a mutation, if needed.
|
||||
* @param nonceGroup Nonce group from the request.
|
||||
* @param nonce Nonce.
|
||||
* @return whether to proceed this mutation.
|
||||
*/
|
||||
private boolean startNonceOperation(long nonceGroup, long nonce) throws IOException {
|
||||
private boolean startNonceOperation() throws IOException {
|
||||
if (region.rsServices == null || region.rsServices.getNonceManager() == null
|
||||
|| nonce == HConstants.NO_NONCE) {
|
||||
return true;
|
||||
|
@ -3915,11 +3913,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
/**
|
||||
* Ends nonce operation for a mutation, if needed.
|
||||
* @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
|
||||
* @param nonce Nonce.
|
||||
* @param success Whether the operation for this nonce has succeeded.
|
||||
*/
|
||||
private void endNonceOperation(long nonceGroup, long nonce, boolean success) {
|
||||
private void endNonceOperation(boolean success) {
|
||||
if (region.rsServices != null && region.rsServices.getNonceManager() != null
|
||||
&& nonce != HConstants.NO_NONCE) {
|
||||
region.rsServices.getNonceManager().endOperation(nonceGroup, nonce, success);
|
||||
|
@ -4188,13 +4184,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
// For nonce operations
|
||||
if (canProceed && nonce != HConstants.NO_NONCE) {
|
||||
boolean[] areAllIncrementsAndAppendsSuccessful = new boolean[]{true};
|
||||
visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
|
||||
if (canProceed[i]) {
|
||||
endNonceOperation(nonceGroup, nonce,
|
||||
retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS);
|
||||
Mutation mutation = getMutation(i);
|
||||
if (mutation instanceof Increment || mutation instanceof Append) {
|
||||
if (retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
|
||||
areAllIncrementsAndAppendsSuccessful[0] = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
endNonceOperation(areAllIncrementsAndAppendsSuccessful[0]);
|
||||
}
|
||||
|
||||
// See if the column families were consistent through the whole thing.
|
||||
// if they were then keep them. If they were not then pass a null.
|
||||
|
@ -4452,7 +4455,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
private OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
|
||||
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
|
||||
long nonce) throws IOException {
|
||||
// As it stands, this is used for 3 things
|
||||
// * batchMutate with single mutation - put/delete/increment/append, separate or from
|
||||
|
@ -4748,6 +4751,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
@Override
|
||||
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
|
||||
return checkAndMutate(checkAndMutate, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, long nonceGroup,
|
||||
long nonce) throws IOException {
|
||||
byte[] row = checkAndMutate.getRow();
|
||||
Filter filter = null;
|
||||
byte[] family = null;
|
||||
|
@ -4859,9 +4867,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// All edits for the given row (across all column families) must happen atomically.
|
||||
Result r;
|
||||
if (mutation != null) {
|
||||
r = mutate(mutation, true).getResult();
|
||||
r = mutate(mutation, true, nonceGroup, nonce).getResult();
|
||||
} else {
|
||||
r = mutateRow(rowMutations);
|
||||
r = mutateRow(rowMutations, nonceGroup, nonce);
|
||||
}
|
||||
this.checkAndMutateChecksPassed.increment();
|
||||
return new CheckAndMutateResult(true, r);
|
||||
|
@ -7517,9 +7525,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
@Override
|
||||
public Result mutateRow(RowMutations rm) throws IOException {
|
||||
return mutateRow(rm, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
public Result mutateRow(RowMutations rm, long nonceGroup, long nonce) throws IOException {
|
||||
final List<Mutation> m = rm.getMutations();
|
||||
OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true,
|
||||
HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true, nonceGroup, nonce);
|
||||
|
||||
List<Result> results = new ArrayList<>();
|
||||
for (OperationStatus status : statuses) {
|
||||
|
|
|
@ -598,43 +598,47 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
|
||||
private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
|
||||
CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement)
|
||||
throws IOException {
|
||||
CellScanner cellScanner, Condition condition, long nonceGroup,
|
||||
ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
|
||||
int countOfCompleteMutation = 0;
|
||||
try {
|
||||
if (!region.getRegionInfo().isMetaRegion()) {
|
||||
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
|
||||
}
|
||||
List<Mutation> mutations = new ArrayList<>();
|
||||
long nonce = HConstants.NO_NONCE;
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
action.getGet());
|
||||
}
|
||||
MutationType type = action.getMutation().getMutateType();
|
||||
MutationProto mutation = action.getMutation();
|
||||
MutationType type = mutation.getMutateType();
|
||||
switch (type) {
|
||||
case PUT:
|
||||
Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
|
||||
Put put = ProtobufUtil.toPut(mutation, cellScanner);
|
||||
++countOfCompleteMutation;
|
||||
checkCellSizeLimit(region, put);
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
|
||||
mutations.add(put);
|
||||
break;
|
||||
case DELETE:
|
||||
Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
|
||||
Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
|
||||
++countOfCompleteMutation;
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
|
||||
mutations.add(del);
|
||||
break;
|
||||
case INCREMENT:
|
||||
Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
|
||||
Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
|
||||
nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
|
||||
++countOfCompleteMutation;
|
||||
checkCellSizeLimit(region, increment);
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
|
||||
mutations.add(increment);
|
||||
break;
|
||||
case APPEND:
|
||||
Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner);
|
||||
Append append = ProtobufUtil.toAppend(mutation, cellScanner);
|
||||
nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
|
||||
++countOfCompleteMutation;
|
||||
checkCellSizeLimit(region, append);
|
||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
|
||||
|
@ -654,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
|
||||
}
|
||||
if (result == null) {
|
||||
result = region.checkAndMutate(checkAndMutate);
|
||||
result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
|
||||
}
|
||||
|
@ -909,21 +913,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
|
||||
private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
|
||||
final OperationQuota quota, final List<ClientProtos.Action> mutations,
|
||||
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
|
||||
final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement)
|
||||
throws IOException {
|
||||
// Just throw the exception. The exception will be caught and then added to region-level
|
||||
// exception for RegionAction. Leaving the null to action result is ok since the null
|
||||
// result is viewed as failure by hbase client. And the region-lever exception will be used
|
||||
// to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
|
||||
// AsyncBatchRpcRetryingCaller#onComplete for more details.
|
||||
doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
|
||||
doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
|
||||
}
|
||||
|
||||
private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
|
||||
final OperationQuota quota, final List<ClientProtos.Action> mutations,
|
||||
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
|
||||
try {
|
||||
doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
|
||||
doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
|
||||
spaceQuotaEnforcement, false);
|
||||
} catch (IOException e) {
|
||||
// Set the exception for each action. The mutations in same RegionAction are group to
|
||||
// different batch and then be processed individually. Hence, we don't set the region-level
|
||||
|
@ -943,8 +948,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
*/
|
||||
private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
|
||||
final OperationQuota quota, final List<ClientProtos.Action> mutations,
|
||||
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
|
||||
throws IOException {
|
||||
final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement,
|
||||
boolean atomic) throws IOException {
|
||||
Mutation[] mArray = new Mutation[mutations.size()];
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
boolean batchContainsPuts = false, batchContainsDelete = false;
|
||||
|
@ -958,6 +963,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
*/
|
||||
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
|
||||
int i = 0;
|
||||
long nonce = HConstants.NO_NONCE;
|
||||
for (ClientProtos.Action action: mutations) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
|
@ -978,10 +984,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
|
||||
case INCREMENT:
|
||||
mutation = ProtobufUtil.toIncrement(m, cells);
|
||||
nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
|
||||
break;
|
||||
|
||||
case APPEND:
|
||||
mutation = ProtobufUtil.toAppend(m, cells);
|
||||
nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -1006,7 +1014,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
|
||||
}
|
||||
|
||||
OperationStatus[] codes = region.batchMutate(mArray, atomic);
|
||||
OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
|
||||
|
||||
// When atomic is true, it indicates that the mutateRow API or the batch API with
|
||||
// RowMutations is called. In this case, we need to merge the results of the
|
||||
|
@ -2758,7 +2766,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
|
||||
try {
|
||||
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
|
||||
cellScanner, request.getCondition(), spaceQuotaEnforcement);
|
||||
cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
|
||||
responseBuilder.setProcessed(result.isSuccess());
|
||||
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
|
||||
ClientProtos.ResultOrException.newBuilder();
|
||||
|
@ -2815,7 +2823,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (regionAction.getActionCount() == 1) {
|
||||
CheckAndMutateResult result = checkAndMutate(region, quota,
|
||||
regionAction.getAction(0).getMutation(), cellScanner,
|
||||
regionAction.getCondition(), spaceQuotaEnforcement);
|
||||
regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
|
||||
regionActionResultBuilder.setProcessed(result.isSuccess());
|
||||
resultOrExceptionOrBuilder.setIndex(0);
|
||||
if (result.getResult() != null) {
|
||||
|
@ -2824,7 +2832,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
|
||||
} else {
|
||||
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
|
||||
cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
|
||||
cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
|
||||
regionActionResultBuilder.setProcessed(result.isSuccess());
|
||||
for (int i = 0; i < regionAction.getActionCount(); i++) {
|
||||
if (i == 0 && result.getResult() != null) {
|
||||
|
@ -2850,7 +2858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
} else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
|
||||
try {
|
||||
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
|
||||
cellScanner, spaceQuotaEnforcement);
|
||||
cellScanner, nonceGroup, spaceQuotaEnforcement);
|
||||
regionActionResultBuilder.setProcessed(true);
|
||||
// We no longer use MultiResponse#processed. Instead, we use
|
||||
// RegionActionResult#processed. This is for backward compatibility for old clients.
|
||||
|
@ -2963,7 +2971,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
|
||||
if (request.hasCondition()) {
|
||||
CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
|
||||
request.getCondition(), spaceQuotaEnforcement);
|
||||
request.getCondition(), nonceGroup, spaceQuotaEnforcement);
|
||||
builder.setProcessed(result.isSuccess());
|
||||
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
|
||||
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
|
||||
|
@ -3047,11 +3055,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
|
||||
private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
|
||||
MutationProto mutation, CellScanner cellScanner, Condition condition,
|
||||
MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
|
||||
ActivePolicyEnforcement spaceQuota) throws IOException {
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
|
||||
cellScanner);
|
||||
long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
|
||||
checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
|
||||
spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
|
||||
quota.addMutation((Mutation) checkAndMutate.getAction());
|
||||
|
@ -3061,7 +3070,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
|
||||
}
|
||||
if (result == null) {
|
||||
result = region.checkAndMutate(checkAndMutate);
|
||||
result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -57,13 +60,17 @@ public class TestAsyncTableNoncedRetry {
|
|||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("async");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||
private static final byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
private static byte[] QUALIFIER = Bytes.toBytes("cq");
|
||||
private static final byte[] QUALIFIER = Bytes.toBytes("cq");
|
||||
|
||||
private static byte[] VALUE = Bytes.toBytes("value");
|
||||
private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
|
||||
|
||||
private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
|
||||
|
||||
private static final byte[] VALUE = Bytes.toBytes("value");
|
||||
|
||||
private static AsyncConnection ASYNC_CONN;
|
||||
|
||||
|
@ -72,9 +79,14 @@ public class TestAsyncTableNoncedRetry {
|
|||
|
||||
private byte[] row;
|
||||
|
||||
private static AtomicInteger CALLED = new AtomicInteger();
|
||||
private static final AtomicInteger CALLED = new AtomicInteger();
|
||||
|
||||
private static long SLEEP_TIME = 2000;
|
||||
private static final long SLEEP_TIME = 2000;
|
||||
|
||||
private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
|
||||
|
||||
// The number of miniBatchOperations that are executed in a RegionServer
|
||||
private static int miniBatchOperationCount;
|
||||
|
||||
public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
|
||||
|
||||
|
@ -84,21 +96,12 @@ public class TestAsyncTableNoncedRetry {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
|
||||
Result result) throws IOException {
|
||||
if (CALLED.getAndIncrement() == 0) {
|
||||
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
MiniBatchOperationInProgress<Mutation> miniBatchOp) {
|
||||
// We sleep when the last of the miniBatchOperations is executed
|
||||
if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
|
||||
Threads.sleepWithoutInterrupt(SLEEP_TIME);
|
||||
}
|
||||
return RegionObserver.super.postAppend(c, append, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Increment increment, Result result) throws IOException {
|
||||
if (CALLED.getAndIncrement() == 0) {
|
||||
Threads.sleepWithoutInterrupt(SLEEP_TIME);
|
||||
}
|
||||
return RegionObserver.super.postIncrement(c, increment, result);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,8 +132,11 @@ public class TestAsyncTableNoncedRetry {
|
|||
public void testAppend() throws InterruptedException, ExecutionException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||
|
@ -141,9 +147,12 @@ public class TestAsyncTableNoncedRetry {
|
|||
ExecutionException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)
|
||||
.setReturnResults(false)).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isEmpty());
|
||||
|
@ -153,10 +162,14 @@ public class TestAsyncTableNoncedRetry {
|
|||
public void testIncrement() throws InterruptedException, ExecutionException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
|
||||
assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertEquals(1L, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -164,11 +177,218 @@ public class TestAsyncTableNoncedRetry {
|
|||
ExecutionException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)
|
||||
.setReturnResults(false)).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementInRowMutations()
|
||||
throws InterruptedException, ExecutionException, IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.mutateRow(new RowMutations(row)
|
||||
.add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendInRowMutations()
|
||||
throws InterruptedException, ExecutionException, IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.mutateRow(new RowMutations(row)
|
||||
.add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
|
||||
.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementAndAppendInRowMutations()
|
||||
throws InterruptedException, ExecutionException, IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.mutateRow(new RowMutations(row)
|
||||
.add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER2)
|
||||
.build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER2)
|
||||
.build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementInRowMutationsInCheckAndMutate() throws InterruptedException,
|
||||
ExecutionException, IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER3)
|
||||
.build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendInRowMutationsInCheckAndMutate() throws InterruptedException,
|
||||
ExecutionException, IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER3)
|
||||
.build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
|
||||
.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws InterruptedException,
|
||||
ExecutionException, IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER3)
|
||||
.build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatch() throws InterruptedException,
|
||||
ExecutionException, IOException {
|
||||
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
|
||||
byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
|
||||
byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
|
||||
byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
|
||||
byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
|
||||
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
|
||||
.setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
miniBatchOperationCount = 6;
|
||||
List<Object> results = table.batchAll(Arrays.asList(
|
||||
new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
|
||||
new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
|
||||
new RowMutations(row3)
|
||||
.add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
|
||||
CheckAndMutate.newBuilder(row4)
|
||||
.ifNotExists(FAMILY, QUALIFIER2)
|
||||
.build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
|
||||
CheckAndMutate.newBuilder(row5)
|
||||
.ifNotExists(FAMILY, QUALIFIER2)
|
||||
.build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
|
||||
CheckAndMutate.newBuilder(row6)
|
||||
.ifNotExists(FAMILY, QUALIFIER3)
|
||||
.build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE))))).get();
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
|
||||
// should be called 12 times as 6 miniBatchOperations are called twice
|
||||
assertEquals(12, CALLED.get());
|
||||
|
||||
assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER));
|
||||
|
||||
assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER)));
|
||||
|
||||
assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER)));
|
||||
assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2));
|
||||
|
||||
CheckAndMutateResult result;
|
||||
|
||||
result = (CheckAndMutateResult) results.get(3);
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
|
||||
result = (CheckAndMutateResult) results.get(4);
|
||||
assertTrue(result.isSuccess());
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
|
||||
|
||||
result = (CheckAndMutateResult) results.get(5);
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,365 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestHTableNoncedRetry {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHTableNoncedRetry.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("async");
|
||||
|
||||
private static final byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
private static final byte[] QUALIFIER = Bytes.toBytes("cq");
|
||||
|
||||
private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
|
||||
|
||||
private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
|
||||
|
||||
private static final byte[] VALUE = Bytes.toBytes("value");
|
||||
|
||||
private static Connection CONN;
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
private byte[] row;
|
||||
|
||||
private Table table;
|
||||
|
||||
private static final AtomicInteger CALLED = new AtomicInteger();
|
||||
|
||||
private static final int SLEEP_TIME = 2000;
|
||||
|
||||
private static final int RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
|
||||
|
||||
// The number of miniBatchOperations that are executed in a RegionServer
|
||||
private static int miniBatchOperationCount;
|
||||
|
||||
public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
MiniBatchOperationInProgress<Mutation> miniBatchOp) {
|
||||
// We sleep when the last of the miniBatchOperations is executed
|
||||
if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
|
||||
Threads.sleepWithoutInterrupt(SLEEP_TIME);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
TEST_UTIL.getAdmin()
|
||||
.createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
|
||||
.setCoprocessor(SleepOnceCP.class.getName()).build());
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
Closeables.close(CONN, true);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, InterruptedException {
|
||||
row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
|
||||
CALLED.set(0);
|
||||
|
||||
table = CONN.getTable(TABLE_NAME);
|
||||
table.setRpcTimeout(RPC_TIMEOUT);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
table.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppend() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendWhenReturnResultsEqualsFalse() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)
|
||||
.setReturnResults(false));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrement() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L);
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertEquals(1L, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementWhenReturnResultsEqualsFalse() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)
|
||||
.setReturnResults(false));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementInRowMutations() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.mutateRow(new RowMutations(row)
|
||||
.add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendInRowMutations() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.mutateRow(new RowMutations(row)
|
||||
.add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
|
||||
.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementAndAppendInRowMutations() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
Result result = table.mutateRow(new RowMutations(row)
|
||||
.add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementInCheckAndMutate() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER2)
|
||||
.build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendInCheckAndMutate() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER2)
|
||||
.build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementInRowMutationsInCheckAndMutate() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER3)
|
||||
.build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendInRowMutationsInCheckAndMutate() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER3)
|
||||
.build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
|
||||
.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws IOException {
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 1;
|
||||
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
|
||||
.ifNotExists(FAMILY, QUALIFIER3)
|
||||
.build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))));
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
assertEquals(2, CALLED.get());
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatch() throws IOException, InterruptedException {
|
||||
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
|
||||
byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
|
||||
byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
|
||||
byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
|
||||
byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
|
||||
|
||||
assertEquals(0, CALLED.get());
|
||||
|
||||
miniBatchOperationCount = 6;
|
||||
Object[] results = new Object[6];
|
||||
table.batch(Arrays.asList(
|
||||
new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
|
||||
new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
|
||||
new RowMutations(row3)
|
||||
.add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
|
||||
CheckAndMutate.newBuilder(row4)
|
||||
.ifNotExists(FAMILY, QUALIFIER2)
|
||||
.build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
|
||||
CheckAndMutate.newBuilder(row5)
|
||||
.ifNotExists(FAMILY, QUALIFIER2)
|
||||
.build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
|
||||
CheckAndMutate.newBuilder(row6)
|
||||
.ifNotExists(FAMILY, QUALIFIER3)
|
||||
.build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
|
||||
.add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE)))), results);
|
||||
|
||||
// make sure we called twice and the result is still correct
|
||||
|
||||
// should be called 12 times as 6 miniBatchOperations are called twice
|
||||
assertEquals(12, CALLED.get());
|
||||
|
||||
assertArrayEquals(VALUE, ((Result) results[0]).getValue(FAMILY, QUALIFIER));
|
||||
|
||||
assertEquals(1L, Bytes.toLong(((Result) results[1]).getValue(FAMILY, QUALIFIER)));
|
||||
|
||||
assertEquals(1L, Bytes.toLong(((Result) results[2]).getValue(FAMILY, QUALIFIER)));
|
||||
assertArrayEquals(VALUE, ((Result) results[2]).getValue(FAMILY, QUALIFIER2));
|
||||
|
||||
CheckAndMutateResult result;
|
||||
|
||||
result = (CheckAndMutateResult) results[3];
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
|
||||
result = (CheckAndMutateResult) results[4];
|
||||
assertTrue(result.isSuccess());
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
|
||||
|
||||
result = (CheckAndMutateResult) results[5];
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
|
||||
assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue