HBASE-11796 Add client support for atomic checkAndMutate (Srikanth Srungarapu)
This commit is contained in:
parent
5e096c5dea
commit
dab2af79ea
|
@ -1321,6 +1321,35 @@ public class HTable implements HTableInterface, RegionLocator {
|
|||
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||
final CompareOp compareOp, final byte [] value, final RowMutations rm)
|
||||
throws IOException {
|
||||
RegionServerCallable<Boolean> callable =
|
||||
new RegionServerCallable<Boolean>(connection, getName(), row) {
|
||||
@Override
|
||||
public Boolean call(int callTimeout) throws IOException {
|
||||
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setPriority(tableName);
|
||||
controller.setCallTimeout(callTimeout);
|
||||
try {
|
||||
CompareType compareType = CompareType.valueOf(compareOp.name());
|
||||
MultiRequest request = RequestConverter.buildMutateRequest(
|
||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
||||
new BinaryComparator(value), compareType, rm);
|
||||
ClientProtos.MultiResponse response = getStub().multi(controller, request);
|
||||
return Boolean.valueOf(response.getProcessed());
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
};
|
||||
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -666,5 +665,12 @@ public class HTablePool implements Closeable {
|
|||
checkState();
|
||||
table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
|
||||
byte[] value, RowMutations mutation) throws IOException {
|
||||
checkState();
|
||||
return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Service;
|
||||
|
@ -31,10 +36,10 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
|||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
/**
|
||||
* Used to communicate with a single HBase table.
|
||||
|
@ -598,4 +603,21 @@ public interface Table extends Closeable {
|
|||
<R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
|
||||
Message request, byte[] startKey, byte[] endKey, R responsePrototype,
|
||||
Batch.Callback<R> callback) throws ServiceException, Throwable;
|
||||
|
||||
/**
|
||||
* Atomically checks if a row/family/qualifier value matches the expected value.
|
||||
* If it does, it performs the row mutations. If the passed value is null, the check
|
||||
* is for the lack of column (ie: non-existence)
|
||||
*
|
||||
* @param row to check
|
||||
* @param family column family to check
|
||||
* @param qualifier column qualifier to check
|
||||
* @param compareOp the comparison operator
|
||||
* @param value the expected value
|
||||
* @param mutation mutations to perform if check succeeds
|
||||
* @throws IOException e
|
||||
* @return true if the new put was executed, false otherwise
|
||||
*/
|
||||
boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException;
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunnin
|
|||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Triple;
|
||||
|
@ -262,6 +263,52 @@ public final class RequestConverter {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer MutateRequest for conditioned row mutations
|
||||
*
|
||||
* @param regionName
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param comparator
|
||||
* @param compareType
|
||||
* @param rowMutations
|
||||
* @return a mutate request
|
||||
* @throws IOException
|
||||
*/
|
||||
public static ClientProtos.MultiRequest buildMutateRequest(
|
||||
final byte[] regionName, final byte[] row, final byte[] family,
|
||||
final byte [] qualifier, final ByteArrayComparable comparator,
|
||||
final CompareType compareType, final RowMutations rowMutations) throws IOException {
|
||||
RegionAction.Builder builder =
|
||||
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
|
||||
builder.setAtomic(true);
|
||||
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
||||
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
|
||||
Condition condition = buildCondition(
|
||||
row, family, qualifier, comparator, compareType);
|
||||
for (Mutation mutation: rowMutations.getMutations()) {
|
||||
MutationType mutateType = null;
|
||||
if (mutation instanceof Put) {
|
||||
mutateType = MutationType.PUT;
|
||||
} else if (mutation instanceof Delete) {
|
||||
mutateType = MutationType.DELETE;
|
||||
} else {
|
||||
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
|
||||
mutation.getClass().getName());
|
||||
}
|
||||
mutationBuilder.clear();
|
||||
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
|
||||
actionBuilder.clear();
|
||||
actionBuilder.setMutation(mp);
|
||||
builder.addAction(actionBuilder.build());
|
||||
}
|
||||
ClientProtos.MultiRequest request =
|
||||
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
|
||||
.setCondition(condition).build();
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer MutateRequest for a put
|
||||
*
|
||||
|
|
|
@ -28429,6 +28429,20 @@ public final class ClientProtos {
|
|||
* <code>optional uint64 nonceGroup = 2;</code>
|
||||
*/
|
||||
long getNonceGroup();
|
||||
|
||||
// optional .Condition condition = 3;
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
boolean hasCondition();
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition();
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code MultiRequest}
|
||||
|
@ -28503,6 +28517,19 @@ public final class ClientProtos {
|
|||
nonceGroup_ = input.readUInt64();
|
||||
break;
|
||||
}
|
||||
case 26: {
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder subBuilder = null;
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
subBuilder = condition_.toBuilder();
|
||||
}
|
||||
condition_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.PARSER, extensionRegistry);
|
||||
if (subBuilder != null) {
|
||||
subBuilder.mergeFrom(condition_);
|
||||
condition_ = subBuilder.buildPartial();
|
||||
}
|
||||
bitField0_ |= 0x00000002;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -28598,9 +28625,32 @@ public final class ClientProtos {
|
|||
return nonceGroup_;
|
||||
}
|
||||
|
||||
// optional .Condition condition = 3;
|
||||
public static final int CONDITION_FIELD_NUMBER = 3;
|
||||
private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition condition_;
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public boolean hasCondition() {
|
||||
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition() {
|
||||
return condition_;
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder() {
|
||||
return condition_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
regionAction_ = java.util.Collections.emptyList();
|
||||
nonceGroup_ = 0L;
|
||||
condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance();
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -28613,6 +28663,12 @@ public final class ClientProtos {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
if (hasCondition()) {
|
||||
if (!getCondition().isInitialized()) {
|
||||
memoizedIsInitialized = 0;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
memoizedIsInitialized = 1;
|
||||
return true;
|
||||
}
|
||||
|
@ -28626,6 +28682,9 @@ public final class ClientProtos {
|
|||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
output.writeUInt64(2, nonceGroup_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
output.writeMessage(3, condition_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -28643,6 +28702,10 @@ public final class ClientProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt64Size(2, nonceGroup_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeMessageSize(3, condition_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -28673,6 +28736,11 @@ public final class ClientProtos {
|
|||
result = result && (getNonceGroup()
|
||||
== other.getNonceGroup());
|
||||
}
|
||||
result = result && (hasCondition() == other.hasCondition());
|
||||
if (hasCondition()) {
|
||||
result = result && getCondition()
|
||||
.equals(other.getCondition());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -28694,6 +28762,10 @@ public final class ClientProtos {
|
|||
hash = (37 * hash) + NONCEGROUP_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashLong(getNonceGroup());
|
||||
}
|
||||
if (hasCondition()) {
|
||||
hash = (37 * hash) + CONDITION_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getCondition().hashCode();
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -28805,6 +28877,7 @@ public final class ClientProtos {
|
|||
private void maybeForceBuilderInitialization() {
|
||||
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
|
||||
getRegionActionFieldBuilder();
|
||||
getConditionFieldBuilder();
|
||||
}
|
||||
}
|
||||
private static Builder create() {
|
||||
|
@ -28821,6 +28894,12 @@ public final class ClientProtos {
|
|||
}
|
||||
nonceGroup_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
if (conditionBuilder_ == null) {
|
||||
condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance();
|
||||
} else {
|
||||
conditionBuilder_.clear();
|
||||
}
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -28862,6 +28941,14 @@ public final class ClientProtos {
|
|||
to_bitField0_ |= 0x00000001;
|
||||
}
|
||||
result.nonceGroup_ = nonceGroup_;
|
||||
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
to_bitField0_ |= 0x00000002;
|
||||
}
|
||||
if (conditionBuilder_ == null) {
|
||||
result.condition_ = condition_;
|
||||
} else {
|
||||
result.condition_ = conditionBuilder_.build();
|
||||
}
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -28907,6 +28994,9 @@ public final class ClientProtos {
|
|||
if (other.hasNonceGroup()) {
|
||||
setNonceGroup(other.getNonceGroup());
|
||||
}
|
||||
if (other.hasCondition()) {
|
||||
mergeCondition(other.getCondition());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -28918,6 +29008,12 @@ public final class ClientProtos {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
if (hasCondition()) {
|
||||
if (!getCondition().isInitialized()) {
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -29213,6 +29309,123 @@ public final class ClientProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional .Condition condition = 3;
|
||||
private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance();
|
||||
private com.google.protobuf.SingleFieldBuilder<
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder> conditionBuilder_;
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public boolean hasCondition() {
|
||||
return ((bitField0_ & 0x00000004) == 0x00000004);
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition getCondition() {
|
||||
if (conditionBuilder_ == null) {
|
||||
return condition_;
|
||||
} else {
|
||||
return conditionBuilder_.getMessage();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public Builder setCondition(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition value) {
|
||||
if (conditionBuilder_ == null) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
condition_ = value;
|
||||
onChanged();
|
||||
} else {
|
||||
conditionBuilder_.setMessage(value);
|
||||
}
|
||||
bitField0_ |= 0x00000004;
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public Builder setCondition(
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder builderForValue) {
|
||||
if (conditionBuilder_ == null) {
|
||||
condition_ = builderForValue.build();
|
||||
onChanged();
|
||||
} else {
|
||||
conditionBuilder_.setMessage(builderForValue.build());
|
||||
}
|
||||
bitField0_ |= 0x00000004;
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public Builder mergeCondition(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition value) {
|
||||
if (conditionBuilder_ == null) {
|
||||
if (((bitField0_ & 0x00000004) == 0x00000004) &&
|
||||
condition_ != org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance()) {
|
||||
condition_ =
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.newBuilder(condition_).mergeFrom(value).buildPartial();
|
||||
} else {
|
||||
condition_ = value;
|
||||
}
|
||||
onChanged();
|
||||
} else {
|
||||
conditionBuilder_.mergeFrom(value);
|
||||
}
|
||||
bitField0_ |= 0x00000004;
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public Builder clearCondition() {
|
||||
if (conditionBuilder_ == null) {
|
||||
condition_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.getDefaultInstance();
|
||||
onChanged();
|
||||
} else {
|
||||
conditionBuilder_.clear();
|
||||
}
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder getConditionBuilder() {
|
||||
bitField0_ |= 0x00000004;
|
||||
onChanged();
|
||||
return getConditionFieldBuilder().getBuilder();
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder getConditionOrBuilder() {
|
||||
if (conditionBuilder_ != null) {
|
||||
return conditionBuilder_.getMessageOrBuilder();
|
||||
} else {
|
||||
return condition_;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* <code>optional .Condition condition = 3;</code>
|
||||
*/
|
||||
private com.google.protobuf.SingleFieldBuilder<
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder>
|
||||
getConditionFieldBuilder() {
|
||||
if (conditionBuilder_ == null) {
|
||||
conditionBuilder_ = new com.google.protobuf.SingleFieldBuilder<
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ConditionOrBuilder>(
|
||||
condition_,
|
||||
getParentForChildren(),
|
||||
isClean());
|
||||
condition_ = null;
|
||||
}
|
||||
return conditionBuilder_;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:MultiRequest)
|
||||
}
|
||||
|
||||
|
@ -29251,6 +29464,24 @@ public final class ClientProtos {
|
|||
*/
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResultOrBuilder getRegionActionResultOrBuilder(
|
||||
int index);
|
||||
|
||||
// optional bool processed = 2;
|
||||
/**
|
||||
* <code>optional bool processed = 2;</code>
|
||||
*
|
||||
* <pre>
|
||||
* used for mutate to indicate processed only
|
||||
* </pre>
|
||||
*/
|
||||
boolean hasProcessed();
|
||||
/**
|
||||
* <code>optional bool processed = 2;</code>
|
||||
*
|
||||
* <pre>
|
||||
* used for mutate to indicate processed only
|
||||
* </pre>
|
||||
*/
|
||||
boolean getProcessed();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code MultiResponse}
|
||||
|
@ -29311,6 +29542,11 @@ public final class ClientProtos {
|
|||
regionActionResult_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult.PARSER, extensionRegistry));
|
||||
break;
|
||||
}
|
||||
case 16: {
|
||||
bitField0_ |= 0x00000001;
|
||||
processed_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -29353,6 +29589,7 @@ public final class ClientProtos {
|
|||
return PARSER;
|
||||
}
|
||||
|
||||
private int bitField0_;
|
||||
// repeated .RegionActionResult regionActionResult = 1;
|
||||
public static final int REGIONACTIONRESULT_FIELD_NUMBER = 1;
|
||||
private java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult> regionActionResult_;
|
||||
|
@ -29389,8 +29626,33 @@ public final class ClientProtos {
|
|||
return regionActionResult_.get(index);
|
||||
}
|
||||
|
||||
// optional bool processed = 2;
|
||||
public static final int PROCESSED_FIELD_NUMBER = 2;
|
||||
private boolean processed_;
|
||||
/**
|
||||
* <code>optional bool processed = 2;</code>
|
||||
*
|
||||
* <pre>
|
||||
* used for mutate to indicate processed only
|
||||
* </pre>
|
||||
*/
|
||||
public boolean hasProcessed() {
|
||||
return ((bitField0_ & 0x00000001) == 0x00000001);
|
||||
}
|
||||
/**
|
||||
* <code>optional bool processed = 2;</code>
|
||||
*
|
||||
* <pre>
|
||||
* used for mutate to indicate processed only
|
||||
* </pre>
|
||||
*/
|
||||
public boolean getProcessed() {
|
||||
return processed_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
regionActionResult_ = java.util.Collections.emptyList();
|
||||
processed_ = false;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -29413,6 +29675,9 @@ public final class ClientProtos {
|
|||
for (int i = 0; i < regionActionResult_.size(); i++) {
|
||||
output.writeMessage(1, regionActionResult_.get(i));
|
||||
}
|
||||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
output.writeBool(2, processed_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -29426,6 +29691,10 @@ public final class ClientProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeMessageSize(1, regionActionResult_.get(i));
|
||||
}
|
||||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(2, processed_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -29451,6 +29720,11 @@ public final class ClientProtos {
|
|||
boolean result = true;
|
||||
result = result && getRegionActionResultList()
|
||||
.equals(other.getRegionActionResultList());
|
||||
result = result && (hasProcessed() == other.hasProcessed());
|
||||
if (hasProcessed()) {
|
||||
result = result && (getProcessed()
|
||||
== other.getProcessed());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -29468,6 +29742,10 @@ public final class ClientProtos {
|
|||
hash = (37 * hash) + REGIONACTIONRESULT_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getRegionActionResultList().hashCode();
|
||||
}
|
||||
if (hasProcessed()) {
|
||||
hash = (37 * hash) + PROCESSED_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashBoolean(getProcessed());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -29584,6 +29862,8 @@ public final class ClientProtos {
|
|||
} else {
|
||||
regionActionResultBuilder_.clear();
|
||||
}
|
||||
processed_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -29611,6 +29891,7 @@ public final class ClientProtos {
|
|||
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse buildPartial() {
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse result = new org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse(this);
|
||||
int from_bitField0_ = bitField0_;
|
||||
int to_bitField0_ = 0;
|
||||
if (regionActionResultBuilder_ == null) {
|
||||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
regionActionResult_ = java.util.Collections.unmodifiableList(regionActionResult_);
|
||||
|
@ -29620,6 +29901,11 @@ public final class ClientProtos {
|
|||
} else {
|
||||
result.regionActionResult_ = regionActionResultBuilder_.build();
|
||||
}
|
||||
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
to_bitField0_ |= 0x00000001;
|
||||
}
|
||||
result.processed_ = processed_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
}
|
||||
|
@ -29661,6 +29947,9 @@ public final class ClientProtos {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (other.hasProcessed()) {
|
||||
setProcessed(other.getProcessed());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -29934,6 +30223,55 @@ public final class ClientProtos {
|
|||
return regionActionResultBuilder_;
|
||||
}
|
||||
|
||||
// optional bool processed = 2;
|
||||
private boolean processed_ ;
|
||||
/**
|
||||
* <code>optional bool processed = 2;</code>
|
||||
*
|
||||
* <pre>
|
||||
* used for mutate to indicate processed only
|
||||
* </pre>
|
||||
*/
|
||||
public boolean hasProcessed() {
|
||||
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||
}
|
||||
/**
|
||||
* <code>optional bool processed = 2;</code>
|
||||
*
|
||||
* <pre>
|
||||
* used for mutate to indicate processed only
|
||||
* </pre>
|
||||
*/
|
||||
public boolean getProcessed() {
|
||||
return processed_;
|
||||
}
|
||||
/**
|
||||
* <code>optional bool processed = 2;</code>
|
||||
*
|
||||
* <pre>
|
||||
* used for mutate to indicate processed only
|
||||
* </pre>
|
||||
*/
|
||||
public Builder setProcessed(boolean value) {
|
||||
bitField0_ |= 0x00000002;
|
||||
processed_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional bool processed = 2;</code>
|
||||
*
|
||||
* <pre>
|
||||
* used for mutate to indicate processed only
|
||||
* </pre>
|
||||
*/
|
||||
public Builder clearProcessed() {
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
processed_ = false;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:MultiResponse)
|
||||
}
|
||||
|
||||
|
@ -30778,20 +31116,22 @@ public final class ClientProtos {
|
|||
"essorServiceResult\"f\n\022RegionActionResult",
|
||||
"\022-\n\021resultOrException\030\001 \003(\0132\022.ResultOrEx" +
|
||||
"ception\022!\n\texception\030\002 \001(\0132\016.NameBytesPa" +
|
||||
"ir\"G\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\013" +
|
||||
"2\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\"@\n\rM" +
|
||||
"ultiResponse\022/\n\022regionActionResult\030\001 \003(\013" +
|
||||
"2\023.RegionActionResult*\'\n\013Consistency\022\n\n\006" +
|
||||
"STRONG\020\000\022\014\n\010TIMELINE\020\0012\261\002\n\rClientService" +
|
||||
"\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022)\n\006Mu" +
|
||||
"tate\022\016.MutateRequest\032\017.MutateResponse\022#\n" +
|
||||
"\004Scan\022\014.ScanRequest\032\r.ScanResponse\022>\n\rBu",
|
||||
"lkLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bul" +
|
||||
"kLoadHFileResponse\022F\n\013ExecService\022\032.Copr" +
|
||||
"ocessorServiceRequest\032\033.CoprocessorServi" +
|
||||
"ceResponse\022&\n\005Multi\022\r.MultiRequest\032\016.Mul" +
|
||||
"tiResponseBB\n*org.apache.hadoop.hbase.pr" +
|
||||
"otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
|
||||
"ir\"f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\013" +
|
||||
"2\r.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tc" +
|
||||
"ondition\030\003 \001(\0132\n.Condition\"S\n\rMultiRespo" +
|
||||
"nse\022/\n\022regionActionResult\030\001 \003(\0132\023.Region" +
|
||||
"ActionResult\022\021\n\tprocessed\030\002 \001(\010*\'\n\013Consi" +
|
||||
"stency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\261\002\n\rCli" +
|
||||
"entService\022 \n\003Get\022\013.GetRequest\032\014.GetResp" +
|
||||
"onse\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateR",
|
||||
"esponse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResp" +
|
||||
"onse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReq" +
|
||||
"uest\032\026.BulkLoadHFileResponse\022F\n\013ExecServ" +
|
||||
"ice\022\032.CoprocessorServiceRequest\032\033.Coproc" +
|
||||
"essorServiceResponse\022&\n\005Multi\022\r.MultiReq" +
|
||||
"uest\032\016.MultiResponseBB\n*org.apache.hadoo" +
|
||||
"p.hbase.protobuf.generatedB\014ClientProtos" +
|
||||
"H\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -30965,13 +31305,13 @@ public final class ClientProtos {
|
|||
internal_static_MultiRequest_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_MultiRequest_descriptor,
|
||||
new java.lang.String[] { "RegionAction", "NonceGroup", });
|
||||
new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", });
|
||||
internal_static_MultiResponse_descriptor =
|
||||
getDescriptor().getMessageTypes().get(25);
|
||||
internal_static_MultiResponse_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_MultiResponse_descriptor,
|
||||
new java.lang.String[] { "RegionActionResult", });
|
||||
new java.lang.String[] { "RegionActionResult", "Processed", });
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -387,10 +387,13 @@ message RegionActionResult {
|
|||
message MultiRequest {
|
||||
repeated RegionAction regionAction = 1;
|
||||
optional uint64 nonceGroup = 2;
|
||||
optional Condition condition = 3;
|
||||
}
|
||||
|
||||
message MultiResponse {
|
||||
repeated RegionActionResult regionActionResult = 1;
|
||||
// used for mutate to indicate processed only
|
||||
optional bool processed = 2;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -18,29 +18,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Row;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment;
|
||||
|
@ -48,10 +32,11 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
|||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
|
||||
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* A wrapper for HTable. Can be used to restrict privilege.
|
||||
|
@ -353,4 +338,10 @@ public class HTableWrapper implements HTableInterface {
|
|||
table.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
|
||||
callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
|
||||
return table.checkAndMutate(row, family, qualifier, compareOp, value, rm);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2973,6 +2973,89 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
|
||||
//TODO, Think that gets/puts and deletes should be refactored a bit so that
|
||||
//the getting of the lock happens before, so that you would just pass it into
|
||||
//the methods. So in the case of checkAndMutate you could just do lockRow,
|
||||
//get, put, unlockRow or something
|
||||
/**
|
||||
*
|
||||
* @throws IOException
|
||||
* @return true if the new put was executed, false otherwise
|
||||
*/
|
||||
public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
|
||||
CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
|
||||
boolean writeToWAL)
|
||||
throws IOException{
|
||||
checkReadOnly();
|
||||
//TODO, add check for value length or maybe even better move this to the
|
||||
//client if this becomes a global setting
|
||||
checkResources();
|
||||
|
||||
startRegionOperation();
|
||||
try {
|
||||
Get get = new Get(row);
|
||||
checkFamily(family);
|
||||
get.addColumn(family, qualifier);
|
||||
|
||||
// Lock row - note that doBatchMutate will relock this row if called
|
||||
RowLock rowLock = getRowLock(get.getRow());
|
||||
// wait for all previous transactions to complete (with lock held)
|
||||
mvcc.waitForPreviousTransactionsComplete();
|
||||
try {
|
||||
List<Cell> result = get(get, false);
|
||||
|
||||
boolean valueIsNull = comparator.getValue() == null ||
|
||||
comparator.getValue().length == 0;
|
||||
boolean matches = false;
|
||||
if (result.size() == 0 && valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
|
||||
valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() == 1 && !valueIsNull) {
|
||||
Cell kv = result.get(0);
|
||||
int compareResult = comparator.compareTo(kv.getValueArray(),
|
||||
kv.getValueOffset(), kv.getValueLength());
|
||||
switch (compareOp) {
|
||||
case LESS:
|
||||
matches = compareResult < 0;
|
||||
break;
|
||||
case LESS_OR_EQUAL:
|
||||
matches = compareResult <= 0;
|
||||
break;
|
||||
case EQUAL:
|
||||
matches = compareResult == 0;
|
||||
break;
|
||||
case NOT_EQUAL:
|
||||
matches = compareResult != 0;
|
||||
break;
|
||||
case GREATER_OR_EQUAL:
|
||||
matches = compareResult >= 0;
|
||||
break;
|
||||
case GREATER:
|
||||
matches = compareResult > 0;
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unknown Compare op " + compareOp.name());
|
||||
}
|
||||
}
|
||||
//If matches put the new put or delete the new delete
|
||||
if (matches) {
|
||||
// All edits for the given row (across all column families) must
|
||||
// happen atomically.
|
||||
mutateRow(rm);
|
||||
this.checkAndMutateChecksPassed.increment();
|
||||
return true;
|
||||
}
|
||||
this.checkAndMutateChecksFailed.increment();
|
||||
return false;
|
||||
} finally {
|
||||
rowLock.release();
|
||||
}
|
||||
} finally {
|
||||
closeRegionOperation();
|
||||
}
|
||||
}
|
||||
private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
|
||||
// Currently this is only called for puts and deletes, so no nonces.
|
||||
OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
|
||||
|
|
|
@ -18,23 +18,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import com.google.protobuf.TextFormat;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -163,11 +151,22 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
|||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import com.google.protobuf.TextFormat;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Implements the regionserver RPC services.
|
||||
|
@ -383,6 +382,48 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
region.mutateRow(rm);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutate a list of rows atomically.
|
||||
*
|
||||
* @param region
|
||||
* @param actions
|
||||
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param compareOp
|
||||
* @param comparator @throws IOException
|
||||
*/
|
||||
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
|
||||
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
|
||||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
RowMutations rm = null;
|
||||
for (ClientProtos.Action action: actions) {
|
||||
if (action.hasGet()) {
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
|
||||
action.getGet());
|
||||
}
|
||||
MutationType type = action.getMutation().getMutateType();
|
||||
if (rm == null) {
|
||||
rm = new RowMutations(action.getMutation().getRow().toByteArray());
|
||||
}
|
||||
switch (type) {
|
||||
case PUT:
|
||||
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
|
||||
break;
|
||||
case DELETE:
|
||||
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
|
||||
break;
|
||||
default:
|
||||
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
|
||||
}
|
||||
}
|
||||
return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an append mutation.
|
||||
*
|
||||
|
@ -1705,6 +1746,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
List<CellScannable> cellsToReturn = null;
|
||||
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
|
||||
RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
|
||||
Boolean processed = null;
|
||||
|
||||
for (RegionAction regionAction : request.getRegionActionList()) {
|
||||
this.requestCount.add(regionAction.getActionCount());
|
||||
|
@ -1722,7 +1764,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// How does this call happen? It may need some work to play well w/ the surroundings.
|
||||
// Need to return an item per Action along w/ Action index. TODO.
|
||||
try {
|
||||
mutateRows(region, regionAction.getActionList(), cellScanner);
|
||||
if (request.hasCondition()) {
|
||||
Condition condition = request.getCondition();
|
||||
byte[] row = condition.getRow().toByteArray();
|
||||
byte[] family = condition.getFamily().toByteArray();
|
||||
byte[] qualifier = condition.getQualifier().toByteArray();
|
||||
CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
|
||||
ByteArrayComparable comparator =
|
||||
ProtobufUtil.toComparator(condition.getComparator());
|
||||
processed = checkAndRowMutate(region, regionAction.getActionList(),
|
||||
cellScanner, row, family, qualifier, compareOp, comparator);
|
||||
} else {
|
||||
mutateRows(region, regionAction.getActionList(), cellScanner);
|
||||
processed = Boolean.TRUE;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// As it's atomic, we may expect it's a global failure.
|
||||
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
|
||||
|
@ -1738,6 +1793,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
|
||||
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
|
||||
}
|
||||
if (processed != null) responseBuilder.setProcessed(processed);
|
||||
return responseBuilder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,16 +19,10 @@
|
|||
|
||||
package org.apache.hadoop.hbase.rest.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -67,10 +61,15 @@ import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.Service;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
* HTable interface to remote tables accessed via REST gateway
|
||||
|
@ -851,4 +850,9 @@ public class RemoteHTable implements HTableInterface {
|
|||
throws ServiceException, Throwable {
|
||||
throw new UnsupportedOperationException("batchCoprocessorService not implemented");
|
||||
}
|
||||
|
||||
@Override public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
|
||||
throw new UnsupportedOperationException("checkAndMutate not implemented");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestCheckAndMutate {
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutate() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("TestPutWithDelete");
|
||||
final byte[] rowKey = Bytes.toBytes("12345");
|
||||
final byte[] family = Bytes.toBytes("cf");
|
||||
HTable table = TEST_UTIL.createTable(tableName, family);
|
||||
TEST_UTIL.waitTableAvailable(tableName.getName(), 5000);
|
||||
try {
|
||||
// put one row
|
||||
Put put = new Put(rowKey);
|
||||
put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a"));
|
||||
put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b"));
|
||||
put.add(family, Bytes.toBytes("C"), Bytes.toBytes("c"));
|
||||
table.put(put);
|
||||
// get row back and assert the values
|
||||
Get get = new Get(rowKey);
|
||||
Result result = table.get(get);
|
||||
assertTrue("Column A value should be a",
|
||||
Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a"));
|
||||
assertTrue("Column B value should be b",
|
||||
Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b"));
|
||||
assertTrue("Column C value should be c",
|
||||
Bytes.toString(result.getValue(family, Bytes.toBytes("C"))).equals("c"));
|
||||
|
||||
// put the same row again with C column deleted
|
||||
RowMutations rm = new RowMutations(rowKey);
|
||||
put = new Put(rowKey);
|
||||
put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a"));
|
||||
put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b"));
|
||||
rm.add(put);
|
||||
Delete del = new Delete(rowKey);
|
||||
del.deleteColumn(family, Bytes.toBytes("C"));
|
||||
rm.add(del);
|
||||
boolean res = table.checkAndMutate(rowKey, family, Bytes.toBytes("A"), CompareFilter.CompareOp.EQUAL,
|
||||
Bytes.toBytes("a"), rm);
|
||||
assertTrue(res);
|
||||
|
||||
// get row back and assert the values
|
||||
get = new Get(rowKey);
|
||||
result = table.get(get);
|
||||
assertTrue("Column A value should be a",
|
||||
Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a"));
|
||||
assertTrue("Column B value should be b",
|
||||
Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b"));
|
||||
assertTrue("Column C should not exist",
|
||||
result.getValue(family, Bytes.toBytes("C")) == null);
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1509,7 +1509,7 @@ public class TestHRegion {
|
|||
Delete delete = new Delete(row1);
|
||||
delete.deleteFamily(fam1);
|
||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2),
|
||||
delete, true);
|
||||
put, true);
|
||||
assertEquals(false, res);
|
||||
} finally {
|
||||
HRegion.closeHRegion(this.region);
|
||||
|
|
Loading…
Reference in New Issue