HBASE-11796 Add client support for atomic checkAndMutate (Srikanth Srungarapu)

This commit is contained in:
Andrew Purtell 2014-09-24 15:07:38 -07:00
parent 9152d8677e
commit 011bc04416
12 changed files with 767 additions and 83 deletions

View File

@ -1321,6 +1321,35 @@ public class HTable implements HTableInterface, RegionLocator {
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm)
throws IOException {
RegionServerCallable<Boolean> callable =
new RegionServerCallable<Boolean>(connection, getName(), row) {
@Override
public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
MultiRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, rm);
ClientProtos.MultiResponse response = getStub().multi(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/

View File

@ -18,6 +18,11 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
@ -31,10 +36,10 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
* Used to communicate with a single HBase table.
@ -598,4 +603,21 @@ public interface Table extends Closeable {
<R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
Message request, byte[] startKey, byte[] endKey, R responsePrototype,
Batch.Callback<R> callback) throws ServiceException, Throwable;
/**
* Atomically checks if a row/family/qualifier value matches the expected value.
* If it does, it performs the row mutations. If the passed value is null, the check
* is for the lack of column (ie: non-existence)
*
* @param row to check
* @param family column family to check
* @param qualifier column qualifier to check
* @param compareOp the comparison operator
* @param value the expected value
* @param mutation mutations to perform if check succeeds
* @throws IOException e
* @return true if the new put was executed, false otherwise
*/
boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException;
}

View File

@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunnin
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -263,6 +264,52 @@ public final class RequestConverter {
return builder.build();
}
/**
* Create a protocol buffer MutateRequest for conditioned row mutations
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param comparator
* @param compareType
* @param rowMutations
* @return a mutate request
* @throws IOException
*/
public static ClientProtos.MultiRequest buildMutateRequest(
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final ByteArrayComparable comparator,
final CompareType compareType, final RowMutations rowMutations) throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
builder.setAtomic(true);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
ClientProtos.MultiRequest request =
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
.setCondition(condition).build();
return request;
}
/**
* Create a protocol buffer MutateRequest for a put
*

View File

@ -28429,6 +28429,20 @@ public final class ClientProtos {
* <code>optional uint64 nonceGroup = 2;</code>
*/
long getNonceGroup();
// optional .Condition condition = 3;
/**
* <code>optional .Condition condition = 3;</code>
*/
boolean hasCondition();
/**
* <code>optional .Condition condition = 3;</code>
*/
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition();
/**
* <code>optional .Condition condition = 3;</code>
*/
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder();
}
/**
* Protobuf type {@code MultiRequest}
@ -28503,6 +28517,19 @@ public final class ClientProtos {
nonceGroup_ = input.readUInt64();
break;
}
case 26: {
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder subBuilder = null;
if (((bitField0_ & 0x00000002) == 0x00000002)) {
subBuilder = condition_.toBuilder();
}
condition_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(condition_);
condition_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000002;
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -28598,9 +28625,32 @@ public final class ClientProtos {
return nonceGroup_;
}
// optional .Condition condition = 3;
public static final int CONDITION_FIELD_NUMBER = 3;
private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition condition_;
/**
* <code>optional .Condition condition = 3;</code>
*/
public boolean hasCondition() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition() {
return condition_;
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder() {
return condition_;
}
private void initFields() {
regionAction_ = java.util.Collections.emptyList();
nonceGroup_ = 0L;
condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -28613,6 +28663,12 @@ public final class ClientProtos {
return false;
}
}
if (hasCondition()) {
if (!getCondition().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
@ -28626,6 +28682,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeUInt64(2, nonceGroup_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(3, condition_);
}
getUnknownFields().writeTo(output);
}
@ -28643,6 +28702,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(2, nonceGroup_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(3, condition_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -28673,6 +28736,11 @@ public final class ClientProtos {
result = result && (getNonceGroup()
== other.getNonceGroup());
}
result = result && (hasCondition() == other.hasCondition());
if (hasCondition()) {
result = result && getCondition()
.equals(other.getCondition());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -28694,6 +28762,10 @@ public final class ClientProtos {
hash = (37 * hash) + NONCEGROUP_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getNonceGroup());
}
if (hasCondition()) {
hash = (37 * hash) + CONDITION_FIELD_NUMBER;
hash = (53 * hash) + getCondition().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -28805,6 +28877,7 @@ public final class ClientProtos {
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getRegionActionFieldBuilder();
getConditionFieldBuilder();
}
}
private static Builder create() {
@ -28821,6 +28894,12 @@ public final class ClientProtos {
}
nonceGroup_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
if (conditionBuilder_ == null) {
condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance();
} else {
conditionBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -28862,6 +28941,14 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000001;
}
result.nonceGroup_ = nonceGroup_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000002;
}
if (conditionBuilder_ == null) {
result.condition_ = condition_;
} else {
result.condition_ = conditionBuilder_.build();
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -28907,6 +28994,9 @@ public final class ClientProtos {
if (other.hasNonceGroup()) {
setNonceGroup(other.getNonceGroup());
}
if (other.hasCondition()) {
mergeCondition(other.getCondition());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -28918,6 +29008,12 @@ public final class ClientProtos {
return false;
}
}
if (hasCondition()) {
if (!getCondition().isInitialized()) {
return false;
}
}
return true;
}
@ -29213,6 +29309,123 @@ public final class ClientProtos {
return this;
}
// optional .Condition condition = 3;
private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder> conditionBuilder_;
/**
* <code>optional .Condition condition = 3;</code>
*/
public boolean hasCondition() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition() {
if (conditionBuilder_ == null) {
return condition_;
} else {
return conditionBuilder_.getMessage();
}
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public Builder setCondition(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition value) {
if (conditionBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
condition_ = value;
onChanged();
} else {
conditionBuilder_.setMessage(value);
}
bitField0_ |= 0x00000004;
return this;
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public Builder setCondition(
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder builderForValue) {
if (conditionBuilder_ == null) {
condition_ = builderForValue.build();
onChanged();
} else {
conditionBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000004;
return this;
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public Builder mergeCondition(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition value) {
if (conditionBuilder_ == null) {
if (((bitField0_ & 0x00000004) == 0x00000004) &&
condition_ != org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance()) {
condition_ =
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.newBuilder(condition_).mergeFrom(value).buildPartial();
} else {
condition_ = value;
}
onChanged();
} else {
conditionBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000004;
return this;
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public Builder clearCondition() {
if (conditionBuilder_ == null) {
condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance();
onChanged();
} else {
conditionBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder getConditionBuilder() {
bitField0_ |= 0x00000004;
onChanged();
return getConditionFieldBuilder().getBuilder();
}
/**
* <code>optional .Condition condition = 3;</code>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder() {
if (conditionBuilder_ != null) {
return conditionBuilder_.getMessageOrBuilder();
} else {
return condition_;
}
}
/**
* <code>optional .Condition condition = 3;</code>
*/
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder>
getConditionFieldBuilder() {
if (conditionBuilder_ == null) {
conditionBuilder_ = new com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder>(
condition_,
getParentForChildren(),
isClean());
condition_ = null;
}
return conditionBuilder_;
}
// @@protoc_insertion_point(builder_scope:MultiRequest)
}
@ -29251,6 +29464,24 @@ public final class ClientProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResultOrBuilder getRegionActionResultOrBuilder(
int index);
// optional bool processed = 2;
/**
* <code>optional bool processed = 2;</code>
*
* <pre>
* used for mutate to indicate processed only
* </pre>
*/
boolean hasProcessed();
/**
* <code>optional bool processed = 2;</code>
*
* <pre>
* used for mutate to indicate processed only
* </pre>
*/
boolean getProcessed();
}
/**
* Protobuf type {@code MultiResponse}
@ -29311,6 +29542,11 @@ public final class ClientProtos {
regionActionResult_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult.PARSER, extensionRegistry));
break;
}
case 16: {
bitField0_ |= 0x00000001;
processed_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -29353,6 +29589,7 @@ public final class ClientProtos {
return PARSER;
}
private int bitField0_;
// repeated .RegionActionResult regionActionResult = 1;
public static final int REGIONACTIONRESULT_FIELD_NUMBER = 1;
private java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult> regionActionResult_;
@ -29389,8 +29626,33 @@ public final class ClientProtos {
return regionActionResult_.get(index);
}
// optional bool processed = 2;
public static final int PROCESSED_FIELD_NUMBER = 2;
private boolean processed_;
/**
* <code>optional bool processed = 2;</code>
*
* <pre>
* used for mutate to indicate processed only
* </pre>
*/
public boolean hasProcessed() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional bool processed = 2;</code>
*
* <pre>
* used for mutate to indicate processed only
* </pre>
*/
public boolean getProcessed() {
return processed_;
}
private void initFields() {
regionActionResult_ = java.util.Collections.emptyList();
processed_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -29413,6 +29675,9 @@ public final class ClientProtos {
for (int i = 0; i < regionActionResult_.size(); i++) {
output.writeMessage(1, regionActionResult_.get(i));
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBool(2, processed_);
}
getUnknownFields().writeTo(output);
}
@ -29426,6 +29691,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, regionActionResult_.get(i));
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(2, processed_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -29451,6 +29720,11 @@ public final class ClientProtos {
boolean result = true;
result = result && getRegionActionResultList()
.equals(other.getRegionActionResultList());
result = result && (hasProcessed() == other.hasProcessed());
if (hasProcessed()) {
result = result && (getProcessed()
== other.getProcessed());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -29468,6 +29742,10 @@ public final class ClientProtos {
hash = (37 * hash) + REGIONACTIONRESULT_FIELD_NUMBER;
hash = (53 * hash) + getRegionActionResultList().hashCode();
}
if (hasProcessed()) {
hash = (37 * hash) + PROCESSED_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getProcessed());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -29584,6 +29862,8 @@ public final class ClientProtos {
} else {
regionActionResultBuilder_.clear();
}
processed_ = false;
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
@ -29611,6 +29891,7 @@ public final class ClientProtos {
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse result = new org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (regionActionResultBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
regionActionResult_ = java.util.Collections.unmodifiableList(regionActionResult_);
@ -29620,6 +29901,11 @@ public final class ClientProtos {
} else {
result.regionActionResult_ = regionActionResultBuilder_.build();
}
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000001;
}
result.processed_ = processed_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@ -29661,6 +29947,9 @@ public final class ClientProtos {
}
}
}
if (other.hasProcessed()) {
setProcessed(other.getProcessed());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -29934,6 +30223,55 @@ public final class ClientProtos {
return regionActionResultBuilder_;
}
// optional bool processed = 2;
private boolean processed_ ;
/**
* <code>optional bool processed = 2;</code>
*
* <pre>
* used for mutate to indicate processed only
* </pre>
*/
public boolean hasProcessed() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional bool processed = 2;</code>
*
* <pre>
* used for mutate to indicate processed only
* </pre>
*/
public boolean getProcessed() {
return processed_;
}
/**
* <code>optional bool processed = 2;</code>
*
* <pre>
* used for mutate to indicate processed only
* </pre>
*/
public Builder setProcessed(boolean value) {
bitField0_ |= 0x00000002;
processed_ = value;
onChanged();
return this;
}
/**
* <code>optional bool processed = 2;</code>
*
* <pre>
* used for mutate to indicate processed only
* </pre>
*/
public Builder clearProcessed() {
bitField0_ = (bitField0_ & ~0x00000002);
processed_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:MultiResponse)
}
@ -30778,20 +31116,22 @@ public final class ClientProtos {
"essorServiceResult\"f\n\022RegionActionResult",
"\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrEx" +
"ception\022!\n\texception\030\002 \001(\0132\016.NameBytesPa" +
"ir\"G\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\013" +
"2\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\"@\n\rM" +
"ultiResponse\022/\n\022regionActionResult\030\001 \003(\013" +
"2\023.RegionActionResult*\'\n\013Consistency\022\n\n\006" +
"STRONG\020\000\022\014\n\010TIMELINE\020\0012\261\002\n\rClientService" +
"\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mu" +
"tate\022\016.MutateRequest\032\017.MutateResponse\022#\n" +
"\004Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBu",
"lkLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bul" +
"kLoadHFileResponse\022F\n\013ExecService\022\032.Copr" +
"ocessorServiceRequest\032\033.CoprocessorServi" +
"ceResponse\022&\n\005Multi\022\r.MultiRequest\032\016.Mul" +
"tiResponseBB\n*org.apache.hadoop.hbase.pr" +
"otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
"ir\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\013" +
"2\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tc" +
"ondition\030\003 \001(\0132\n.Condition\"S\n\rMultiRespo" +
"nse\022/\n\022regionActionResult\030\001 \003(\0132\023.Region" +
"ActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consi" +
"stency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\261\002\n\rCli" +
"entService\022 \n\003Get\022\013.GetRequest\032\014.GetResp" +
"onse\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateR",
"esponse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResp" +
"onse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReq" +
"uest\032\026.BulkLoadHFileResponse\022F\n\013ExecServ" +
"ice\022\032.CoprocessorServiceRequest\032\033.Coproc" +
"essorServiceResponse\022&\n\005Multi\022\r.MultiReq" +
"uest\032\016.MultiResponseBB\n*org.apache.hadoo" +
"p.hbase.protobuf.generatedB\014ClientProtos" +
"H\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -30965,13 +31305,13 @@ public final class ClientProtos {
internal_static_MultiRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MultiRequest_descriptor,
new java.lang.String[] { "RegionAction", "NonceGroup", });
new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", });
internal_static_MultiResponse_descriptor =
getDescriptor().getMessageTypes().get(25);
internal_static_MultiResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MultiResponse_descriptor,
new java.lang.String[] { "RegionActionResult", });
new java.lang.String[] { "RegionActionResult", "Processed", });
return null;
}
};

View File

@ -387,10 +387,13 @@ message RegionActionResult {
message MultiRequest {
repeated RegionAction regionAction = 1;
optional uint64 nonceGroup = 2;
optional Condition condition = 3;
}
message MultiResponse {
repeated RegionActionResult regionActionResult = 1;
// used for mutate to indicate processed only
optional bool processed = 2;
}

View File

@ -18,29 +18,13 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment;
@ -48,10 +32,11 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.io.MultipleIOException;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* A wrapper for HTable. Can be used to restrict privilege.
@ -353,4 +338,10 @@ public class HTableWrapper implements HTableInterface {
table.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
callback);
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
return table.checkAndMutate(row, family, qualifier, compareOp, value, rm);
}
}

View File

@ -2967,6 +2967,89 @@ public class HRegion implements HeapSize { // , Writable{
}
}
//TODO, Think that gets/puts and deletes should be refactored a bit so that
//the getting of the lock happens before, so that you would just pass it into
//the methods. So in the case of checkAndMutate you could just do lockRow,
//get, put, unlockRow or something
/**
*
* @throws IOException
* @return true if the new put was executed, false otherwise
*/
public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
boolean writeToWAL)
throws IOException{
checkReadOnly();
//TODO, add check for value length or maybe even better move this to the
//client if this becomes a global setting
checkResources();
startRegionOperation();
try {
Get get = new Get(row);
checkFamily(family);
get.addColumn(family, qualifier);
// Lock row - note that doBatchMutate will relock this row if called
RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
mvcc.waitForPreviousTransactionsComplete();
try {
List<Cell> result = get(get, false);
boolean valueIsNull = comparator.getValue() == null ||
comparator.getValue().length == 0;
boolean matches = false;
if (result.size() == 0 && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
valueIsNull) {
matches = true;
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
int compareResult = comparator.compareTo(kv.getValueArray(),
kv.getValueOffset(), kv.getValueLength());
switch (compareOp) {
case LESS:
matches = compareResult < 0;
break;
case LESS_OR_EQUAL:
matches = compareResult <= 0;
break;
case EQUAL:
matches = compareResult == 0;
break;
case NOT_EQUAL:
matches = compareResult != 0;
break;
case GREATER_OR_EQUAL:
matches = compareResult >= 0;
break;
case GREATER:
matches = compareResult > 0;
break;
default:
throw new RuntimeException("Unknown Compare op " + compareOp.name());
}
}
//If matches put the new put or delete the new delete
if (matches) {
// All edits for the given row (across all column families) must
// happen atomically.
mutateRow(rm);
this.checkAndMutateChecksPassed.increment();
return true;
}
this.checkAndMutateChecksFailed.increment();
return false;
} finally {
rowLock.release();
}
} finally {
closeRegionOperation();
}
}
private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
// Currently this is only called for puts and deletes, so no nonces.
OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },

View File

@ -18,23 +18,11 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -165,11 +153,22 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* Implements the regionserver RPC services.
@ -385,6 +384,48 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region.mutateRow(rm);
}
/**
* Mutate a list of rows atomically.
*
* @param region
* @param actions
* @param cellScanner if non-null, the mutation data -- the Cell content.
* @param row
* @param family
* @param qualifier
* @param compareOp
* @param comparator @throws IOException
*/
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
if (!region.getRegionInfo().isMetaTable()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
RowMutations rm = null;
for (ClientProtos.Action action: actions) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
}
MutationType type = action.getMutation().getMutateType();
if (rm == null) {
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}
return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
}
/**
* Execute an append mutation.
*
@ -1711,6 +1752,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
List<CellScannable> cellsToReturn = null;
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
Boolean processed = null;
for (RegionAction regionAction : request.getRegionActionList()) {
this.requestCount.add(regionAction.getActionCount());
@ -1730,7 +1772,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// How does this call happen? It may need some work to play well w/ the surroundings.
// Need to return an item per Action along w/ Action index. TODO.
try {
mutateRows(region, regionAction.getActionList(), cellScanner);
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.getFamily().toByteArray();
byte[] qualifier = condition.getQualifier().toByteArray();
CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
ByteArrayComparable comparator =
ProtobufUtil.toComparator(condition.getComparator());
processed = checkAndRowMutate(region, regionAction.getActionList(),
cellScanner, row, family, qualifier, compareOp, comparator);
} else {
mutateRows(region, regionAction.getActionList(), cellScanner);
processed = Boolean.TRUE;
}
} catch (IOException e) {
// As it's atomic, we may expect it's a global failure.
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
@ -1747,6 +1802,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
}
if (processed != null) responseBuilder.setProcessed(processed);
return responseBuilder.build();
}

View File

@ -19,16 +19,10 @@
package org.apache.hadoop.hbase.rest.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -67,10 +61,15 @@ import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
/**
* HTable interface to remote tables accessed via REST gateway
@ -851,4 +850,9 @@ public class RemoteHTable implements HTableInterface {
throws ServiceException, Throwable {
throw new UnsupportedOperationException("batchCoprocessorService not implemented");
}
@Override public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
throw new UnsupportedOperationException("checkAndMutate not implemented");
}
}

View File

@ -0,0 +1,103 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertTrue;
@Category(MediumTests.class)
public class TestCheckAndMutate {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster();
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testCheckAndMutate() throws Exception {
final TableName tableName = TableName.valueOf("TestPutWithDelete");
final byte[] rowKey = Bytes.toBytes("12345");
final byte[] family = Bytes.toBytes("cf");
HTable table = TEST_UTIL.createTable(tableName, family);
TEST_UTIL.waitTableAvailable(tableName.getName(), 5000);
try {
// put one row
Put put = new Put(rowKey);
put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.add(family, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put);
// get row back and assert the values
Get get = new Get(rowKey);
Result result = table.get(get);
assertTrue("Column A value should be a",
Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a"));
assertTrue("Column B value should be b",
Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b"));
assertTrue("Column C value should be c",
Bytes.toString(result.getValue(family, Bytes.toBytes("C"))).equals("c"));
// put the same row again with C column deleted
RowMutations rm = new RowMutations(rowKey);
put = new Put(rowKey);
put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b"));
rm.add(put);
Delete del = new Delete(rowKey);
del.deleteColumn(family, Bytes.toBytes("C"));
rm.add(del);
boolean res = table.checkAndMutate(rowKey, family, Bytes.toBytes("A"), CompareFilter.CompareOp.EQUAL,
Bytes.toBytes("a"), rm);
assertTrue(res);
// get row back and assert the values
get = new Get(rowKey);
result = table.get(get);
assertTrue("Column A value should be a",
Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a"));
assertTrue("Column B value should be b",
Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b"));
assertTrue("Column C should not exist",
result.getValue(family, Bytes.toBytes("C")) == null);
} finally {
table.close();
}
}
}

View File

@ -1510,7 +1510,7 @@ public class TestHRegion {
Delete delete = new Delete(row1);
delete.deleteFamily(fam1);
res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2),
delete, true);
put, true);
assertEquals(false, res);
} finally {
HRegion.closeHRegion(this.region);

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
@ -671,5 +670,12 @@ public class HTablePool implements Closeable {
checkState();
table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
byte[] value, RowMutations mutation) throws IOException {
checkState();
return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
}
}
}