diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 5bba1e1beca..dc7f7d23a10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ExtendedCellBuilder; import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; @@ -68,6 +69,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; @@ -751,6 +753,9 @@ public final class ProtobufUtil { */ public static Mutation toMutation(final MutationProto proto) throws IOException { MutationType type = proto.getMutateType(); + if (type == MutationType.INCREMENT) { + return toIncrement(proto, null); + } if (type == MutationType.APPEND) { return toAppend(proto, null); } @@ -1790,4 +1795,51 @@ public final class ProtobufUtil { .setTo(timeRange.getMax()) .build(); } + + public static TimeRange toTimeRange(HBaseProtos.TimeRange timeRange) { + if (timeRange == null) { + return TimeRange.allTime(); + } + if (timeRange.hasFrom()) { + if (timeRange.hasTo()) { + return TimeRange.between(timeRange.getFrom(), timeRange.getTo()); + } else { + return TimeRange.from(timeRange.getFrom()); + } + } else { + return TimeRange.until(timeRange.getTo()); + } + } + + public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family, + final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, + final TimeRange timeRange) throws IOException { + + ClientProtos.Condition.Builder builder = ClientProtos.Condition.newBuilder() + .setRow(ByteStringer.wrap(row)); + + if (filter != null) { + builder.setFilter(ProtobufUtil.toFilter(filter)); + } else { + builder.setFamily(ByteStringer.wrap(family)) + .setQualifier(ByteStringer.wrap( + qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier)) + .setComparator( + ProtobufUtil.toComparator(new BinaryComparator(value))) + .setCompareType(HBaseProtos.CompareType.valueOf(op.name())); + } + + return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build(); + } + + public static ClientProtos.Condition toCondition(final byte[] row, final Filter filter, + final TimeRange timeRange) throws IOException { + return toCondition(row, null, null, null, null, filter, timeRange); + } + + public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family, + final byte[] qualifier, final CompareOperator op, final byte[] value, + final TimeRange timeRange) throws IOException { + return toCondition(row, family, qualifier, op, value, null, timeRange); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 9064ded50c1..cb471d2d135 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; @@ -917,6 +918,9 @@ public final class ProtobufUtil { */ public static Mutation toMutation(final MutationProto proto) throws IOException { MutationType type = proto.getMutateType(); + if (type == MutationType.INCREMENT) { + return toIncrement(proto, null); + } if (type == MutationType.APPEND) { return toAppend(proto, null); } @@ -3632,6 +3636,37 @@ public final class ProtobufUtil { } } + public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family, + final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, + final TimeRange timeRange) throws IOException { + + ClientProtos.Condition.Builder builder = ClientProtos.Condition.newBuilder() + .setRow(UnsafeByteOperations.unsafeWrap(row)); + + if (filter != null) { + builder.setFilter(ProtobufUtil.toFilter(filter)); + } else { + builder.setFamily(UnsafeByteOperations.unsafeWrap(family)) + .setQualifier(UnsafeByteOperations.unsafeWrap( + qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier)) + .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value))) + .setCompareType(HBaseProtos.CompareType.valueOf(op.name())); + } + + return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build(); + } + + public static ClientProtos.Condition toCondition(final byte[] row, final Filter filter, + final TimeRange timeRange) throws IOException { + return toCondition(row, null, null, null, null, filter, timeRange); + } + + public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family, + final byte[] qualifier, final CompareOperator op, final byte[] value, + final TimeRange timeRange) throws IOException { + return toCondition(row, family, qualifier, op, value, null, timeRange); + } + public static List toBalancerDecisionResponse( HBaseProtos.LogEntry logEntry) { try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 21644f4373e..bdae13bbfce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.master.RegionState; @@ -102,7 +101,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationPr import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; @@ -257,7 +255,7 @@ public final class RequestConverter { builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation)); } return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)) - .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange)) + .setCondition(ProtobufUtil.toCondition(row, family, qualifier, op, value, filter, timeRange)) .build(); } @@ -271,8 +269,8 @@ public final class RequestConverter { final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange, final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException { - return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op, - value, filter, timeRange), nonceGroup, nonce); + return buildMultiRequest(regionName, rowMutations, ProtobufUtil.toCondition(row, family, + qualifier, op, value, filter, timeRange), nonceGroup, nonce); } /** @@ -666,8 +664,9 @@ public final class RequestConverter { getRegionActionBuilderWithRegion(builder, regionName); CheckAndMutate cam = (CheckAndMutate) action.getAction(); - builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(), - cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange())); + builder.setCondition(ProtobufUtil.toCondition(cam.getRow(), cam.getFamily(), + cam.getQualifier(), cam.getCompareOp(), cam.getValue(), cam.getFilter(), + cam.getTimeRange())); if (cam.getAction() instanceof Put) { actionBuilder.clear(); @@ -832,8 +831,9 @@ public final class RequestConverter { getRegionActionBuilderWithRegion(builder, regionName); CheckAndMutate cam = (CheckAndMutate) action.getAction(); - builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(), - cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange())); + builder.setCondition(ProtobufUtil.toCondition(cam.getRow(), cam.getFamily(), + cam.getQualifier(), cam.getCompareOp(), cam.getValue(), cam.getFilter(), + cam.getTimeRange())); if (cam.getAction() instanceof Put) { actionBuilder.clear(); @@ -1187,31 +1187,6 @@ public final class RequestConverter { return regionBuilder.build(); } - /** - * Create a protocol buffer Condition - * - * @return a Condition - * @throws IOException - */ - public static Condition buildCondition(final byte[] row, final byte[] family, - final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, - final TimeRange timeRange) throws IOException { - - Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row)); - - if (filter != null) { - builder.setFilter(ProtobufUtil.toFilter(filter)); - } else { - builder.setFamily(UnsafeByteOperations.unsafeWrap(family)) - .setQualifier(UnsafeByteOperations.unsafeWrap( - qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier)) - .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value))) - .setCompareType(CompareType.valueOf(op.name())); - } - - return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build(); - } - /** * Create a protocol buffer AddColumnRequest * @@ -1245,7 +1220,7 @@ public final class RequestConverter { final long nonceGroup, final long nonce) { DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder(); - builder.setTableName(ProtobufUtil.toProtoTableName((tableName))); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); builder.setColumnName(UnsafeByteOperations.unsafeWrap(columnName)); builder.setNonceGroup(nonceGroup); builder.setNonce(nonce); diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 1b8b98642b4..13917b6d66c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -132,8 +132,8 @@ message GetResponse { } /** - * Condition to check if the value of a given cell (row, - * family, qualifier) matches a value via a given comparator. + * Condition to check if the value of a given cell (row, family, qualifier) matches a value via a + * given comparator or the value of a given cell matches a given filter. * * Condition is used in check and mutate operations. */ diff --git a/hbase-protocol/src/main/protobuf/MultiRowMutation.proto b/hbase-protocol/src/main/protobuf/MultiRowMutation.proto index d3140e9b2e0..571e633c15b 100644 --- a/hbase-protocol/src/main/protobuf/MultiRowMutation.proto +++ b/hbase-protocol/src/main/protobuf/MultiRowMutation.proto @@ -37,6 +37,7 @@ message MutateRowsRequest { optional uint64 nonce_group = 2; optional uint64 nonce = 3; optional RegionSpecifier region = 4; + repeated Condition condition = 5; } message MutateRowsResponse { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java index d2307735c1c..c840d545168 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java @@ -25,31 +25,43 @@ import java.util.List; import java.util.SortedSet; import java.util.TreeSet; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.WrongRegionException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; /** - * This class demonstrates how to implement atomic multi row transactions using - * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} - * and Coprocessor endpoints. + * This class implements atomic multi row transactions using + * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} and Coprocessor + * endpoints. We can also specify some conditions to perform conditional update. * * Defines a protocol to perform multi row transactions. * See {@link MultiRowMutationEndpoint} for the implementation. @@ -60,18 +72,29 @@ import com.google.protobuf.Service; *
* Example: * - * List<Mutation> mutations = ...; - * Put p1 = new Put(row1); - * Put p2 = new Put(row2); + * Put p = new Put(row1); + * Delete d = new Delete(row2); + * Increment i = new Increment(row3); + * Append a = new Append(row4); * ... - * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p1); - * Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p2); + * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p); + * Mutate m2 = ProtobufUtil.toMutate(MutateType.DELETE, d); + * Mutate m3 = ProtobufUtil.toMutate(MutateType.INCREMENT, i); + * Mutate m4 = ProtobufUtil.toMutate(MutateType.Append, a); + * * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); * mrmBuilder.addMutationRequest(m1); * mrmBuilder.addMutationRequest(m2); + * mrmBuilder.addMutationRequest(m3); + * mrmBuilder.addMutationRequest(m4); + * + * // We can also specify conditions to preform conditional update + * mrmBuilder.addCondition(ProtobufUtil.toCondition(row, FAMILY, QUALIFIER, + * CompareOperator.EQUAL, value, TimeRange.allTime())); + * * CoprocessorRpcChannel channel = t.coprocessorService(ROW); * MultiRowMutationService.BlockingInterface service = - * MultiRowMutationService.newBlockingStub(channel); + * MultiRowMutationService.newBlockingStub(channel); * MutateRowsRequest mrm = mrmBuilder.build(); * service.mutateRows(null, mrm); * @@ -79,11 +102,16 @@ import com.google.protobuf.Service; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor { + private static final Logger LOGGER = LoggerFactory.getLogger(HRegion.class); + private RegionCoprocessorEnvironment env; + @Override public void mutateRows(RpcController controller, MutateRowsRequest request, RpcCallback done) { MutateRowsResponse response = MutateRowsResponse.getDefaultInstance(); + + List rowLocks = null; try { // set of rows to lock, sorted to avoid deadlocks SortedSet rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR); @@ -93,7 +121,9 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements mutations.add(ProtobufUtil.toMutation(m)); } - RegionInfo regionInfo = env.getRegion().getRegionInfo(); + Region region = env.getRegion(); + + RegionInfo regionInfo = region.getRegionInfo(); for (Mutation m : mutations) { // check whether rows are in range for this region if (!HRegion.rowIsInRange(regionInfo, m.getRow())) { @@ -110,16 +140,134 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements } rowsToLock.add(m.getRow()); } - // call utility method on region - long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; - long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE; - env.getRegion().mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); + + boolean matches = true; + if (request.getConditionCount() > 0) { + // Get row locks for the mutations and the conditions + rowLocks = new ArrayList<>(); + for (ClientProtos.Condition condition : request.getConditionList()) { + rowsToLock.add(condition.getRow().toByteArray()); + } + for (byte[] row : rowsToLock) { + try { + Region.RowLock rowLock = region.getRowLock(row, false); // write lock + rowLocks.add(rowLock); + } catch (IOException ioe) { + LOGGER.warn("Failed getting lock, row={}, in region {}", Bytes.toStringBinary(row), + this, ioe); + throw ioe; + } + } + + // Check if all the conditions match + for (ClientProtos.Condition condition : request.getConditionList()) { + if (!matches(region, condition)) { + matches = false; + break; + } + } + } + + if (matches) { + // call utility method on region + long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; + long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE; + region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); + } finally { + if (rowLocks != null) { + // Release the acquired row locks + for (Region.RowLock rowLock : rowLocks) { + rowLock.release(); + } + } } done.run(response); } + private boolean matches(Region region, ClientProtos.Condition condition) throws IOException { + byte[] row = condition.getRow().toByteArray(); + + Filter filter = null; + byte[] family = null; + byte[] qualifier = null; + CompareOperator op = null; + ByteArrayComparable comparator = null; + + if (condition.hasFilter()) { + filter = ProtobufUtil.toFilter(condition.getFilter()); + } else { + family = condition.getFamily().toByteArray(); + qualifier = condition.getQualifier().toByteArray(); + op = CompareOperator.valueOf(condition.getCompareType().name()); + comparator = ProtobufUtil.toComparator(condition.getComparator()); + } + + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); + + Get get = new Get(row); + if (family != null) { + checkFamily(region, family); + get.addColumn(family, qualifier); + } + if (filter != null) { + get.setFilter(filter); + } + if (timeRange != null) { + get.setTimeRange(timeRange.getMin(), timeRange.getMax()); + } + + List result = region.get(get, false); + boolean matches = false; + if (filter != null) { + if (!result.isEmpty()) { + matches = true; + } + } else { + boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; + if (result.isEmpty() && valueIsNull) { + matches = true; + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { + matches = true; + } else if (result.size() == 1 && !valueIsNull) { + Cell kv = result.get(0); + int compareResult = PrivateCellUtil.compareValue(kv, comparator); + matches = matches(op, compareResult); + } + } + return matches; + } + + private void checkFamily(Region region, byte[] family) throws NoSuchColumnFamilyException { + if (!region.getTableDescriptor().hasColumnFamily(family)) { + throw new NoSuchColumnFamilyException( + "Column family " + Bytes.toString(family) + " does not exist in region " + this + + " in table " + region.getTableDescriptor()); + } + } + + private boolean matches(CompareOperator op, int compareResult) { + switch (op) { + case LESS: + return compareResult < 0; + case LESS_OR_EQUAL: + return compareResult <= 0; + case EQUAL: + return compareResult == 0; + case NOT_EQUAL: + return compareResult != 0; + case GREATER_OR_EQUAL: + return compareResult >= 0; + case GREATER: + return compareResult > 0; + default: + throw new RuntimeException("Unknown Compare op " + op.name()); + } + } + @Override public Iterable getServices() { return Collections.singleton(this); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java index 19256fd838b..809fd2ad291 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.io.TimeRange; @@ -265,30 +266,330 @@ public class TestFromClientSide5 extends FromClientSideBase { LOG.info("Starting testMultiRowMutation"); final TableName tableName = name.getTableName(); final byte [] ROW1 = Bytes.toBytes("testRow1"); + final byte [] ROW2 = Bytes.toBytes("testRow2"); + final byte [] ROW3 = Bytes.toBytes("testRow3"); try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { - Put p = new Put(ROW); - p.addColumn(FAMILY, QUALIFIER, VALUE); - MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p); + // Add initial data + t.batch(Arrays.asList( + new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE), + new Put(ROW2).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(1L)), + new Put(ROW3).addColumn(FAMILY, QUALIFIER, VALUE) + ), new Object[3]); - p = new Put(ROW1); - p.addColumn(FAMILY, QUALIFIER, VALUE); - MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, p); + // Execute MultiRowMutation + Put put = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); + MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put); + + Delete delete = new Delete(ROW1); + MutationProto m2 = ProtobufUtil.toMutation(MutationType.DELETE, delete); + + Increment increment = new Increment(ROW2).addColumn(FAMILY, QUALIFIER, 1L); + MutationProto m3 = ProtobufUtil.toMutation(MutationType.INCREMENT, increment); + + Append append = new Append(ROW3).addColumn(FAMILY, QUALIFIER, VALUE); + MutationProto m4 = ProtobufUtil.toMutation(MutationType.APPEND, append); MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); mrmBuilder.addMutationRequest(m1); mrmBuilder.addMutationRequest(m2); - MutateRowsRequest mrm = mrmBuilder.build(); + mrmBuilder.addMutationRequest(m3); + mrmBuilder.addMutationRequest(m4); + CoprocessorRpcChannel channel = t.coprocessorService(ROW); MultiRowMutationService.BlockingInterface service = MultiRowMutationService.newBlockingStub(channel); - service.mutateRows(null, mrm); - Get g = new Get(ROW); - Result r = t.get(g); - assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER))); - g = new Get(ROW1); - r = t.get(g); - assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER))); + service.mutateRows(null, mrmBuilder.build()); + + // Assert + Result r = t.get(new Get(ROW)); + assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + + r = t.get(new Get(ROW1)); + assertTrue(r.isEmpty()); + + r = t.get(new Get(ROW2)); + assertEquals(2L, Bytes.toLong(r.getValue(FAMILY, QUALIFIER))); + + r = t.get(new Get(ROW3)); + assertEquals(Bytes.toString(VALUE) + Bytes.toString(VALUE), + Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + } + } + + @Test + public void testMultiRowMutationWithSingleConditionWhenConditionMatches() throws Exception { + final TableName tableName = name.getTableName(); + final byte [] ROW1 = Bytes.toBytes("testRow1"); + final byte [] ROW2 = Bytes.toBytes("testRow2"); + final byte [] VALUE1 = Bytes.toBytes("testValue1"); + final byte [] VALUE2 = Bytes.toBytes("testValue2"); + + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + // Add initial data + t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)); + + // Execute MultiRowMutation with conditions + Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); + MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1); + Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1); + MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2); + Delete delete = new Delete(ROW2); + MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete); + + MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); + mrmBuilder.addMutationRequest(m1); + mrmBuilder.addMutationRequest(m2); + mrmBuilder.addMutationRequest(m3); + mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER, + CompareOperator.EQUAL, VALUE2, null)); + + CoprocessorRpcChannel channel = t.coprocessorService(ROW); + MultiRowMutationService.BlockingInterface service = + MultiRowMutationService.newBlockingStub(channel); + service.mutateRows(null, mrmBuilder.build()); + + // Assert + Result r = t.get(new Get(ROW)); + assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + + r = t.get(new Get(ROW1)); + assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + + r = t.get(new Get(ROW2)); + assertTrue(r.isEmpty()); + } + } + + @Test + public void testMultiRowMutationWithSingleConditionWhenConditionNotMatch() throws Exception { + final TableName tableName = name.getTableName(); + final byte [] ROW1 = Bytes.toBytes("testRow1"); + final byte [] ROW2 = Bytes.toBytes("testRow2"); + final byte [] VALUE1 = Bytes.toBytes("testValue1"); + final byte [] VALUE2 = Bytes.toBytes("testValue2"); + + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + // Add initial data + t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)); + + // Execute MultiRowMutation with conditions + Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); + MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1); + Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1); + MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2); + Delete delete = new Delete(ROW2); + MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete); + + MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); + mrmBuilder.addMutationRequest(m1); + mrmBuilder.addMutationRequest(m2); + mrmBuilder.addMutationRequest(m3); + mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER, + CompareOperator.EQUAL, VALUE1, null)); + + CoprocessorRpcChannel channel = t.coprocessorService(ROW); + MultiRowMutationService.BlockingInterface service = + MultiRowMutationService.newBlockingStub(channel); + service.mutateRows(null, mrmBuilder.build()); + + // Assert + Result r = t.get(new Get(ROW)); + assertTrue(r.isEmpty()); + + r = t.get(new Get(ROW1)); + assertTrue(r.isEmpty()); + + r = t.get(new Get(ROW2)); + assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + } + } + + @Test + public void testMultiRowMutationWithMultipleConditionsWhenConditionsMatch() throws Exception { + final TableName tableName = name.getTableName(); + final byte [] ROW1 = Bytes.toBytes("testRow1"); + final byte [] ROW2 = Bytes.toBytes("testRow2"); + final byte [] VALUE1 = Bytes.toBytes("testValue1"); + final byte [] VALUE2 = Bytes.toBytes("testValue2"); + + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + // Add initial data + t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)); + + // Execute MultiRowMutation with conditions + Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); + MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1); + Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1); + MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2); + Delete delete = new Delete(ROW2); + MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete); + + MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); + mrmBuilder.addMutationRequest(m1); + mrmBuilder.addMutationRequest(m2); + mrmBuilder.addMutationRequest(m3); + mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW, FAMILY, QUALIFIER, + CompareOperator.EQUAL, null, null)); + mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER, + CompareOperator.EQUAL, VALUE2, null)); + + CoprocessorRpcChannel channel = t.coprocessorService(ROW); + MultiRowMutationService.BlockingInterface service = + MultiRowMutationService.newBlockingStub(channel); + service.mutateRows(null, mrmBuilder.build()); + + // Assert + Result r = t.get(new Get(ROW)); + assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + + r = t.get(new Get(ROW1)); + assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + + r = t.get(new Get(ROW2)); + assertTrue(r.isEmpty()); + } + } + + @Test + public void testMultiRowMutationWithMultipleConditionsWhenConditionsNotMatch() throws Exception { + final TableName tableName = name.getTableName(); + final byte [] ROW1 = Bytes.toBytes("testRow1"); + final byte [] ROW2 = Bytes.toBytes("testRow2"); + final byte [] VALUE1 = Bytes.toBytes("testValue1"); + final byte [] VALUE2 = Bytes.toBytes("testValue2"); + + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + // Add initial data + t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)); + + // Execute MultiRowMutation with conditions + Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); + MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1); + Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1); + MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2); + Delete delete = new Delete(ROW2); + MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete); + + MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); + mrmBuilder.addMutationRequest(m1); + mrmBuilder.addMutationRequest(m2); + mrmBuilder.addMutationRequest(m3); + mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW1, FAMILY, QUALIFIER, + CompareOperator.EQUAL, null, null)); + mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER, + CompareOperator.EQUAL, VALUE1, null)); + + CoprocessorRpcChannel channel = t.coprocessorService(ROW); + MultiRowMutationService.BlockingInterface service = + MultiRowMutationService.newBlockingStub(channel); + service.mutateRows(null, mrmBuilder.build()); + + // Assert + Result r = t.get(new Get(ROW)); + assertTrue(r.isEmpty()); + + r = t.get(new Get(ROW1)); + assertTrue(r.isEmpty()); + + r = t.get(new Get(ROW2)); + assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + } + } + + @Test + public void testMultiRowMutationWithFilterConditionWhenConditionMatches() throws Exception { + final TableName tableName = name.getTableName(); + final byte [] ROW1 = Bytes.toBytes("testRow1"); + final byte [] ROW2 = Bytes.toBytes("testRow2"); + final byte [] QUALIFIER2 = Bytes.toBytes("testQualifier2"); + final byte [] VALUE1 = Bytes.toBytes("testValue1"); + final byte [] VALUE2 = Bytes.toBytes("testValue2"); + final byte [] VALUE3 = Bytes.toBytes("testValue3"); + + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + // Add initial data + t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2) + .addColumn(FAMILY, QUALIFIER2, VALUE3)); + + // Execute MultiRowMutation with conditions + Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); + MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1); + Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1); + MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2); + Delete delete = new Delete(ROW2); + MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete); + + MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); + mrmBuilder.addMutationRequest(m1); + mrmBuilder.addMutationRequest(m2); + mrmBuilder.addMutationRequest(m3); + mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, new FilterList( + new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOperator.EQUAL, VALUE2), + new SingleColumnValueFilter(FAMILY, QUALIFIER2, CompareOperator.EQUAL, VALUE3)), null)); + + CoprocessorRpcChannel channel = t.coprocessorService(ROW); + MultiRowMutationService.BlockingInterface service = + MultiRowMutationService.newBlockingStub(channel); + service.mutateRows(null, mrmBuilder.build()); + + // Assert + Result r = t.get(new Get(ROW)); + assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + + r = t.get(new Get(ROW1)); + assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); + + r = t.get(new Get(ROW2)); + assertTrue(r.isEmpty()); + } + } + + @Test + public void testMultiRowMutationWithFilterConditionWhenConditionNotMatch() throws Exception { + final TableName tableName = name.getTableName(); + final byte [] ROW1 = Bytes.toBytes("testRow1"); + final byte [] ROW2 = Bytes.toBytes("testRow2"); + final byte [] QUALIFIER2 = Bytes.toBytes("testQualifier2"); + final byte [] VALUE1 = Bytes.toBytes("testValue1"); + final byte [] VALUE2 = Bytes.toBytes("testValue2"); + final byte [] VALUE3 = Bytes.toBytes("testValue3"); + + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + // Add initial data + t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2) + .addColumn(FAMILY, QUALIFIER2, VALUE3)); + + // Execute MultiRowMutation with conditions + Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); + MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1); + Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1); + MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2); + Delete delete = new Delete(ROW2); + MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete); + + MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); + mrmBuilder.addMutationRequest(m1); + mrmBuilder.addMutationRequest(m2); + mrmBuilder.addMutationRequest(m3); + mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, new FilterList( + new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOperator.EQUAL, VALUE2), + new SingleColumnValueFilter(FAMILY, QUALIFIER2, CompareOperator.EQUAL, VALUE2)), null)); + + CoprocessorRpcChannel channel = t.coprocessorService(ROW); + MultiRowMutationService.BlockingInterface service = + MultiRowMutationService.newBlockingStub(channel); + service.mutateRows(null, mrmBuilder.build()); + + // Assert + Result r = t.get(new Get(ROW)); + assertTrue(r.isEmpty()); + + r = t.get(new Get(ROW1)); + assertTrue(r.isEmpty()); + + r = t.get(new Get(ROW2)); + assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER))); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java index c23bc7d7dca..781611095bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -237,8 +237,8 @@ public class TestMalformedCellFromClient { builder.setAtomic(true); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); - ClientProtos.Condition condition = RequestConverter - .buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null); + ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null, + CompareOperator.EQUAL, new byte[10], null, null); for (Mutation mutation : rm.getMutations()) { ClientProtos.MutationProto.MutationType mutateType = null; if (mutation instanceof Put) {