HBASE-25703 Support conditional update in MultiRowMutationEndpoint (#3098)

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Toshihiro Suzuki 2021-03-30 09:18:56 +09:00 committed by GitHub
parent 57a49f5ca7
commit 46f7d9dd4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 523 additions and 65 deletions

View File

@ -100,6 +100,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
@ -965,6 +966,9 @@ public final class ProtobufUtil {
*/
public static Mutation toMutation(final MutationProto proto) throws IOException {
MutationType type = proto.getMutateType();
if (type == MutationType.INCREMENT) {
return toIncrement(proto, null);
}
if (type == MutationType.APPEND) {
return toAppend(proto, null);
}
@ -3722,6 +3726,37 @@ public final class ProtobufUtil {
}
}
public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange) throws IOException {
ClientProtos.Condition.Builder builder = ClientProtos.Condition.newBuilder()
.setRow(UnsafeByteOperations.unsafeWrap(row));
if (filter != null) {
builder.setFilter(ProtobufUtil.toFilter(filter));
} else {
builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
.setQualifier(UnsafeByteOperations.unsafeWrap(
qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
.setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
.setCompareType(HBaseProtos.CompareType.valueOf(op.name()));
}
return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
}
public static ClientProtos.Condition toCondition(final byte[] row, final Filter filter,
final TimeRange timeRange) throws IOException {
return toCondition(row, null, null, null, null, filter, timeRange);
}
public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value,
final TimeRange timeRange) throws IOException {
return toCondition(row, family, qualifier, op, value, null, timeRange);
}
public static List<LogEntry> toBalancerDecisionResponse(
HBaseProtos.LogEntry logEntry) {
try {

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.master.RegionState;
@ -101,7 +100,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationPr
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
@ -213,7 +211,7 @@ public final class RequestConverter {
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation));
}
return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.setCondition(ProtobufUtil.toCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
@ -227,8 +225,8 @@ public final class RequestConverter {
final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op,
value, filter, timeRange), nonceGroup, nonce);
return buildMultiRequest(regionName, rowMutations, ProtobufUtil.toCondition(row, family,
qualifier, op, value, filter, timeRange), nonceGroup, nonce);
}
/**
@ -609,8 +607,9 @@ public final class RequestConverter {
getRegionActionBuilderWithRegion(builder, regionName);
CheckAndMutate cam = (CheckAndMutate) action.getAction();
builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(),
cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange()));
builder.setCondition(ProtobufUtil.toCondition(cam.getRow(), cam.getFamily(),
cam.getQualifier(), cam.getCompareOp(), cam.getValue(), cam.getFilter(),
cam.getTimeRange()));
if (cam.getAction() instanceof Put) {
actionBuilder.clear();
@ -964,31 +963,6 @@ public final class RequestConverter {
return regionBuilder.build();
}
/**
* Create a protocol buffer Condition
*
* @return a Condition
* @throws IOException
*/
public static Condition buildCondition(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange) throws IOException {
Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row));
if (filter != null) {
builder.setFilter(ProtobufUtil.toFilter(filter));
} else {
builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
.setQualifier(UnsafeByteOperations.unsafeWrap(
qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
.setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
.setCompareType(CompareType.valueOf(op.name()));
}
return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
}
/**
* Create a protocol buffer AddColumnRequest
*

View File

@ -132,8 +132,8 @@ message GetResponse {
}
/**
* Condition to check if the value of a given cell (row,
* family, qualifier) matches a value via a given comparator.
* Condition to check if the value of a given cell (row, family, qualifier) matches a value via a
* given comparator or the value of a given cell matches a given filter.
*
* Condition is used in check and mutate operations.
*/

View File

@ -37,6 +37,7 @@ message MutateRowsRequest {
optional uint64 nonce_group = 2;
optional uint64 nonce = 3;
optional RegionSpecifier region = 4;
repeated Condition condition = 5;
}
message MutateRowsResponse {

View File

@ -24,33 +24,44 @@ import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
/**
* This class demonstrates how to implement atomic multi row transactions using
* {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)}
* and Coprocessor endpoints.
* This class implements atomic multi row transactions using
* {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} and Coprocessor
* endpoints. We can also specify some conditions to perform conditional update.
*
* Defines a protocol to perform multi row transactions.
* See {@link MultiRowMutationEndpoint} for the implementation.
@ -61,18 +72,29 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.
* <br>
* Example:
* <code>
* List&lt;Mutation&gt; mutations = ...;
* Put p1 = new Put(row1);
* Put p2 = new Put(row2);
* Put p = new Put(row1);
* Delete d = new Delete(row2);
* Increment i = new Increment(row3);
* Append a = new Append(row4);
* ...
* Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p1);
* Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p2);
* Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p);
* Mutate m2 = ProtobufUtil.toMutate(MutateType.DELETE, d);
* Mutate m3 = ProtobufUtil.toMutate(MutateType.INCREMENT, i);
* Mutate m4 = ProtobufUtil.toMutate(MutateType.Append, a);
*
* MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
* mrmBuilder.addMutationRequest(m1);
* mrmBuilder.addMutationRequest(m2);
* mrmBuilder.addMutationRequest(m3);
* mrmBuilder.addMutationRequest(m4);
*
* // We can also specify conditions to preform conditional update
* mrmBuilder.addCondition(ProtobufUtil.toCondition(row, FAMILY, QUALIFIER,
* CompareOperator.EQUAL, value, TimeRange.allTime()));
*
* CoprocessorRpcChannel channel = t.coprocessorService(ROW);
* MultiRowMutationService.BlockingInterface service =
* MultiRowMutationService.newBlockingStub(channel);
* MultiRowMutationService.newBlockingStub(channel);
* MutateRowsRequest mrm = mrmBuilder.build();
* service.mutateRows(null, mrm);
* </code>
@ -80,11 +102,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor {
private static final Logger LOGGER = LoggerFactory.getLogger(HRegion.class);
private RegionCoprocessorEnvironment env;
@Override
public void mutateRows(RpcController controller, MutateRowsRequest request,
RpcCallback<MutateRowsResponse> done) {
MutateRowsResponse response = MutateRowsResponse.getDefaultInstance();
List<Region.RowLock> rowLocks = null;
try {
// set of rows to lock, sorted to avoid deadlocks
SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR);
@ -94,7 +121,9 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
mutations.add(ProtobufUtil.toMutation(m));
}
RegionInfo regionInfo = env.getRegion().getRegionInfo();
Region region = env.getRegion();
RegionInfo regionInfo = region.getRegionInfo();
for (Mutation m : mutations) {
// check whether rows are in range for this region
if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
@ -111,16 +140,134 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
}
rowsToLock.add(m.getRow());
}
// call utility method on region
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
env.getRegion().mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
boolean matches = true;
if (request.getConditionCount() > 0) {
// Get row locks for the mutations and the conditions
rowLocks = new ArrayList<>();
for (ClientProtos.Condition condition : request.getConditionList()) {
rowsToLock.add(condition.getRow().toByteArray());
}
for (byte[] row : rowsToLock) {
try {
Region.RowLock rowLock = region.getRowLock(row, false); // write lock
rowLocks.add(rowLock);
} catch (IOException ioe) {
LOGGER.warn("Failed getting lock, row={}, in region {}", Bytes.toStringBinary(row),
this, ioe);
throw ioe;
}
}
// Check if all the conditions match
for (ClientProtos.Condition condition : request.getConditionList()) {
if (!matches(region, condition)) {
matches = false;
break;
}
}
}
if (matches) {
// call utility method on region
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (rowLocks != null) {
// Release the acquired row locks
for (Region.RowLock rowLock : rowLocks) {
rowLock.release();
}
}
}
done.run(response);
}
private boolean matches(Region region, ClientProtos.Condition condition) throws IOException {
byte[] row = condition.getRow().toByteArray();
Filter filter = null;
byte[] family = null;
byte[] qualifier = null;
CompareOperator op = null;
ByteArrayComparable comparator = null;
if (condition.hasFilter()) {
filter = ProtobufUtil.toFilter(condition.getFilter());
} else {
family = condition.getFamily().toByteArray();
qualifier = condition.getQualifier().toByteArray();
op = CompareOperator.valueOf(condition.getCompareType().name());
comparator = ProtobufUtil.toComparator(condition.getComparator());
}
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
Get get = new Get(row);
if (family != null) {
checkFamily(region, family);
get.addColumn(family, qualifier);
}
if (filter != null) {
get.setFilter(filter);
}
if (timeRange != null) {
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
List<Cell> result = region.get(get, false);
boolean matches = false;
if (filter != null) {
if (!result.isEmpty()) {
matches = true;
}
} else {
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
if (result.isEmpty() && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
matches = true;
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
matches = matches(op, compareResult);
}
}
return matches;
}
private void checkFamily(Region region, byte[] family) throws NoSuchColumnFamilyException {
if (!region.getTableDescriptor().hasColumnFamily(family)) {
throw new NoSuchColumnFamilyException(
"Column family " + Bytes.toString(family) + " does not exist in region " + this
+ " in table " + region.getTableDescriptor());
}
}
private boolean matches(CompareOperator op, int compareResult) {
switch (op) {
case LESS:
return compareResult < 0;
case LESS_OR_EQUAL:
return compareResult <= 0;
case EQUAL:
return compareResult == 0;
case NOT_EQUAL:
return compareResult != 0;
case GREATER_OR_EQUAL:
return compareResult >= 0;
case GREATER:
return compareResult > 0;
default:
throw new RuntimeException("Unknown Compare op " + op.name());
}
}
@Override
public Iterable<Service> getServices() {
return Collections.singleton(this);

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.io.TimeRange;
@ -267,30 +268,330 @@ public class TestFromClientSide5 extends FromClientSideBase {
LOG.info("Starting testMultiRowMutation");
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
final byte [] ROW2 = Bytes.toBytes("testRow2");
final byte [] ROW3 = Bytes.toBytes("testRow3");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
Put p = new Put(ROW);
p.addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
// Add initial data
t.batch(Arrays.asList(
new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE),
new Put(ROW2).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(1L)),
new Put(ROW3).addColumn(FAMILY, QUALIFIER, VALUE)
), new Object[3]);
p = new Put(ROW1);
p.addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, p);
// Execute MultiRowMutation
Put put = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put);
Delete delete = new Delete(ROW1);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
Increment increment = new Increment(ROW2).addColumn(FAMILY, QUALIFIER, 1L);
MutationProto m3 = ProtobufUtil.toMutation(MutationType.INCREMENT, increment);
Append append = new Append(ROW3).addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m4 = ProtobufUtil.toMutation(MutationType.APPEND, append);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
MutateRowsRequest mrm = mrmBuilder.build();
mrmBuilder.addMutationRequest(m3);
mrmBuilder.addMutationRequest(m4);
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrm);
Get g = new Get(ROW);
Result r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
g = new Get(ROW1);
r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
service.mutateRows(null, mrmBuilder.build());
// Assert
Result r = t.get(new Get(ROW));
assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
r = t.get(new Get(ROW1));
assertTrue(r.isEmpty());
r = t.get(new Get(ROW2));
assertEquals(2L, Bytes.toLong(r.getValue(FAMILY, QUALIFIER)));
r = t.get(new Get(ROW3));
assertEquals(Bytes.toString(VALUE) + Bytes.toString(VALUE),
Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
}
}
@Test
public void testMultiRowMutationWithSingleConditionWhenConditionMatches() throws Exception {
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
final byte [] ROW2 = Bytes.toBytes("testRow2");
final byte [] VALUE1 = Bytes.toBytes("testValue1");
final byte [] VALUE2 = Bytes.toBytes("testValue2");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
// Add initial data
t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
// Execute MultiRowMutation with conditions
Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
Delete delete = new Delete(ROW2);
MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
mrmBuilder.addMutationRequest(m3);
mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
CompareOperator.EQUAL, VALUE2, null));
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build());
// Assert
Result r = t.get(new Get(ROW));
assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
r = t.get(new Get(ROW1));
assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
r = t.get(new Get(ROW2));
assertTrue(r.isEmpty());
}
}
@Test
public void testMultiRowMutationWithSingleConditionWhenConditionNotMatch() throws Exception {
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
final byte [] ROW2 = Bytes.toBytes("testRow2");
final byte [] VALUE1 = Bytes.toBytes("testValue1");
final byte [] VALUE2 = Bytes.toBytes("testValue2");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
// Add initial data
t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
// Execute MultiRowMutation with conditions
Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
Delete delete = new Delete(ROW2);
MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
mrmBuilder.addMutationRequest(m3);
mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
CompareOperator.EQUAL, VALUE1, null));
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build());
// Assert
Result r = t.get(new Get(ROW));
assertTrue(r.isEmpty());
r = t.get(new Get(ROW1));
assertTrue(r.isEmpty());
r = t.get(new Get(ROW2));
assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
}
}
@Test
public void testMultiRowMutationWithMultipleConditionsWhenConditionsMatch() throws Exception {
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
final byte [] ROW2 = Bytes.toBytes("testRow2");
final byte [] VALUE1 = Bytes.toBytes("testValue1");
final byte [] VALUE2 = Bytes.toBytes("testValue2");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
// Add initial data
t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
// Execute MultiRowMutation with conditions
Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
Delete delete = new Delete(ROW2);
MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
mrmBuilder.addMutationRequest(m3);
mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW, FAMILY, QUALIFIER,
CompareOperator.EQUAL, null, null));
mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
CompareOperator.EQUAL, VALUE2, null));
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build());
// Assert
Result r = t.get(new Get(ROW));
assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
r = t.get(new Get(ROW1));
assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
r = t.get(new Get(ROW2));
assertTrue(r.isEmpty());
}
}
@Test
public void testMultiRowMutationWithMultipleConditionsWhenConditionsNotMatch() throws Exception {
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
final byte [] ROW2 = Bytes.toBytes("testRow2");
final byte [] VALUE1 = Bytes.toBytes("testValue1");
final byte [] VALUE2 = Bytes.toBytes("testValue2");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
// Add initial data
t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
// Execute MultiRowMutation with conditions
Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
Delete delete = new Delete(ROW2);
MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
mrmBuilder.addMutationRequest(m3);
mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW1, FAMILY, QUALIFIER,
CompareOperator.EQUAL, null, null));
mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
CompareOperator.EQUAL, VALUE1, null));
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build());
// Assert
Result r = t.get(new Get(ROW));
assertTrue(r.isEmpty());
r = t.get(new Get(ROW1));
assertTrue(r.isEmpty());
r = t.get(new Get(ROW2));
assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
}
}
@Test
public void testMultiRowMutationWithFilterConditionWhenConditionMatches() throws Exception {
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
final byte [] ROW2 = Bytes.toBytes("testRow2");
final byte [] QUALIFIER2 = Bytes.toBytes("testQualifier2");
final byte [] VALUE1 = Bytes.toBytes("testValue1");
final byte [] VALUE2 = Bytes.toBytes("testValue2");
final byte [] VALUE3 = Bytes.toBytes("testValue3");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
// Add initial data
t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)
.addColumn(FAMILY, QUALIFIER2, VALUE3));
// Execute MultiRowMutation with conditions
Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
Delete delete = new Delete(ROW2);
MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
mrmBuilder.addMutationRequest(m3);
mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, new FilterList(
new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOperator.EQUAL, VALUE2),
new SingleColumnValueFilter(FAMILY, QUALIFIER2, CompareOperator.EQUAL, VALUE3)), null));
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build());
// Assert
Result r = t.get(new Get(ROW));
assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
r = t.get(new Get(ROW1));
assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
r = t.get(new Get(ROW2));
assertTrue(r.isEmpty());
}
}
@Test
public void testMultiRowMutationWithFilterConditionWhenConditionNotMatch() throws Exception {
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
final byte [] ROW2 = Bytes.toBytes("testRow2");
final byte [] QUALIFIER2 = Bytes.toBytes("testQualifier2");
final byte [] VALUE1 = Bytes.toBytes("testValue1");
final byte [] VALUE2 = Bytes.toBytes("testValue2");
final byte [] VALUE3 = Bytes.toBytes("testValue3");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
// Add initial data
t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)
.addColumn(FAMILY, QUALIFIER2, VALUE3));
// Execute MultiRowMutation with conditions
Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
Delete delete = new Delete(ROW2);
MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
mrmBuilder.addMutationRequest(m3);
mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, new FilterList(
new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOperator.EQUAL, VALUE2),
new SingleColumnValueFilter(FAMILY, QUALIFIER2, CompareOperator.EQUAL, VALUE2)), null));
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrmBuilder.build());
// Assert
Result r = t.get(new Get(ROW));
assertTrue(r.isEmpty());
r = t.get(new Get(ROW1));
assertTrue(r.isEmpty());
r = t.get(new Get(ROW2));
assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
}
}

View File

@ -233,8 +233,8 @@ public class TestMalformedCellFromClient {
builder.setAtomic(true);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
ClientProtos.Condition condition = RequestConverter
.buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null);
ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null,
CompareOperator.EQUAL, new byte[10], null, null);
for (Mutation mutation : rm.getMutations()) {
ClientProtos.MutationProto.MutationType mutateType = null;
if (mutation instanceof Put) {