From 6aba045aaed0f2f729e6a7409fdefaac35815b1e Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 24 Mar 2018 00:00:36 +0800 Subject: [PATCH] HBASE-19504 Add TimeRange support into checkAndMutate Signed-off-by: Michael Stack --- .../apache/hadoop/hbase/client/Append.java | 2 +- .../hadoop/hbase/client/AsyncTable.java | 8 +- .../hadoop/hbase/client/AsyncTableImpl.java | 9 +- .../org/apache/hadoop/hbase/client/Get.java | 2 +- .../apache/hadoop/hbase/client/HTable.java | 111 ++++++------ .../apache/hadoop/hbase/client/Increment.java | 2 +- .../hbase/client/RawAsyncTableImpl.java | 15 +- .../org/apache/hadoop/hbase/client/Scan.java | 2 +- .../org/apache/hadoop/hbase/client/Table.java | 6 + .../hadoop/hbase/protobuf/ProtobufUtil.java | 81 +++------ .../hbase/shaded/protobuf/ProtobufUtil.java | 108 +++++------- .../shaded/protobuf/RequestConverter.java | 75 ++++----- .../shaded/protobuf/TestProtobufUtil.java | 6 +- .../org/apache/hadoop/hbase/io/TimeRange.java | 17 ++ .../src/main/protobuf/Client.proto | 1 + hbase-protocol/src/main/protobuf/Client.proto | 1 + .../hbase/rest/client/RemoteHTable.java | 5 + .../hadoop/hbase/regionserver/HRegion.java | 24 +-- .../hbase/regionserver/RSRpcServices.java | 158 ++++++++++-------- .../hadoop/hbase/regionserver/Region.java | 55 +++++- .../hadoop/hbase/client/TestAsyncTable.java | 63 +++++++ .../hbase/client/TestFromClientSide.java | 55 ++++++ .../client/TestMalformedCellFromClient.java | 2 +- .../hbase/protobuf/TestProtobufUtil.java | 5 + .../regionserver/TestAtomicOperation.java | 2 +- .../hbase/regionserver/TestHRegion.java | 68 ++++---- .../TestSimpleTimeRangeTracker.java | 10 +- 27 files changed, 528 insertions(+), 365 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java index 61474b7e357..3a08d687fbb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; public class Append extends Mutation { private static final Logger LOG = LoggerFactory.getLogger(Append.class); private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE; - private TimeRange tr = new TimeRange(); + private TimeRange tr = TimeRange.allTime(); /** * Sets the TimeRange to be used on the Get for this append. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 37c80b3bfac..cc1ba871799 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -22,15 +22,14 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; import com.google.protobuf.RpcChannel; - import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -235,6 +234,11 @@ public interface AsyncTable { */ CheckAndMutateBuilder qualifier(byte[] qualifier); + /** + * @param timeRange time range to check. + */ + CheckAndMutateBuilder timeRange(TimeRange timeRange); + /** * Check for lack of column. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index c8553c6760e..9747d0665d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -20,17 +20,16 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; import com.google.protobuf.RpcChannel; - import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.yetus.audience.InterfaceAudience; /** @@ -151,6 +150,12 @@ class AsyncTableImpl implements AsyncTable { return this; } + @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + builder.timeRange(timeRange); + return this; + } + @Override public CheckAndMutateBuilder ifNotExists() { builder.ifNotExists(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index 9ed3b385304..aae52d21bd4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -72,7 +72,7 @@ public class Get extends Query implements Row { private boolean cacheBlocks = true; private int storeLimit = -1; private int storeOffset = 0; - private TimeRange tr = new TimeRange(); + private TimeRange tr = TimeRange.allTime(); private boolean checkExistenceOnly = false; private boolean closestRowBefore = false; private Map> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1a119790834..69ec3661c5a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -692,14 +693,14 @@ public class HTable implements Table { @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { - return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, put); + return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put); } @Override @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException { - return doCheckAndPut(row, family, qualifier, compareOp.name(), value, put); + return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put); } @Override @@ -708,11 +709,12 @@ public class HTable implements Table { final CompareOperator op, final byte [] value, final Put put) throws IOException { // The name of the operators in CompareOperator are intentionally those of the // operators in the filter's CompareOp enum. - return doCheckAndPut(row, family, qualifier, op.name(), value, put); + return doCheckAndPut(row, family, qualifier, op.name(), value, null, put); } - private boolean doCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, - final String opName, final byte [] value, final Put put) throws IOException { + private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier, + final String opName, final byte[] value, final TimeRange timeRange, final Put put) + throws IOException { ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), row, this.rpcControllerFactory.newController(), put.getPriority()) { @@ -721,7 +723,7 @@ public class HTable implements Table { CompareType compareType = CompareType.valueOf(opName); MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, put); + new BinaryComparator(value), compareType, timeRange, put); MutateResponse response = doMutate(request); return Boolean.valueOf(response.getProcessed()); } @@ -732,60 +734,58 @@ public class HTable implements Table { @Override @Deprecated - public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final byte [] value, final Delete delete) throws IOException { - return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, delete); + public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final byte[] value, final Delete delete) throws IOException { + return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, + delete); } @Override @Deprecated - public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException { - return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, delete); + public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { + return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete); } @Override @Deprecated - public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final CompareOperator op, final byte [] value, final Delete delete) throws IOException { - return doCheckAndDelete(row, family, qualifier, op.name(), value, delete); + public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final CompareOperator op, final byte[] value, final Delete delete) throws IOException { + return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete); } - private boolean doCheckAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final String opName, final byte [] value, final Delete delete) throws IOException { + private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final String opName, final byte[] value, final TimeRange timeRange, final Delete delete) + throws IOException { CancellableRegionServerCallable callable = - new CancellableRegionServerCallable( - this.connection, getName(), row, this.rpcControllerFactory.newController(), - writeRpcTimeoutMs, new RetryingTimeTracker().start(), delete.getPriority()) { - @Override - protected SingleResponse rpcCall() throws Exception { - CompareType compareType = CompareType.valueOf(opName); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, delete); - MutateResponse response = doMutate(request); - return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); - } - }; + new CancellableRegionServerCallable(this.connection, getName(), row, + this.rpcControllerFactory.newController(), writeRpcTimeoutMs, + new RetryingTimeTracker().start(), delete.getPriority()) { + @Override + protected SingleResponse rpcCall() throws Exception { + CompareType compareType = CompareType.valueOf(opName); + MutateRequest request = RequestConverter + .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, new BinaryComparator(value), compareType, timeRange, delete); + MutateResponse response = doMutate(request); + return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); + } + }; List rows = Collections.singletonList(delete); Object[] results = new Object[1]; - AsyncProcessTask task = AsyncProcessTask.newBuilder() - .setPool(pool) - .setTableName(tableName) - .setRowAccess(rows) + AsyncProcessTask task = + AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows) .setCallable(callable) // TODO any better timeout? .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) .setOperationTimeout(operationTimeoutMs) - .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) - .setResults(results) - .build(); + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build(); AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); } - return ((SingleResponse.Entry)results[0]).isProcessed(); + return ((SingleResponse.Entry) results[0]).isProcessed(); } @Override @@ -793,9 +793,9 @@ public class HTable implements Table { return new CheckAndMutateBuilderImpl(row, family); } - private boolean doCheckAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, - final String opName, final byte [] value, final RowMutations rm) - throws IOException { + private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, + final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm) + throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(connection, getName(), rm.getRow(), rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), @@ -803,18 +803,18 @@ public class HTable implements Table { @Override protected MultiResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(opName); - MultiRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, rm); + MultiRequest request = RequestConverter + .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, timeRange, rm); ClientProtos.MultiResponse response = doMulti(request); ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); if (res.hasException()) { Throwable ex = ProtobufUtil.toException(res.getException()); if (ex instanceof IOException) { - throw (IOException)ex; + throw (IOException) ex; } - throw new IOException("Failed to checkAndMutate row: "+ - Bytes.toStringBinary(rm.getRow()), ex); + throw new IOException( + "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex); } return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); } @@ -850,14 +850,14 @@ public class HTable implements Table { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { - return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, rm); + return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm); } @Override @Deprecated public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException { - return doCheckAndMutate(row, family, qualifier, op.name(), value, rm); + return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm); } @Override @@ -1234,6 +1234,7 @@ public class HTable implements Table { private final byte[] row; private final byte[] family; private byte[] qualifier; + private TimeRange timeRange; private CompareOperator op; private byte[] value; @@ -1249,6 +1250,12 @@ public class HTable implements Table { return this; } + @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + @Override public CheckAndMutateBuilder ifNotExists() { this.op = CompareOperator.EQUAL; @@ -1271,19 +1278,19 @@ public class HTable implements Table { @Override public boolean thenPut(Put put) throws IOException { preCheck(); - return doCheckAndPut(row, family, qualifier, op.name(), value, put); + return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put); } @Override public boolean thenDelete(Delete delete) throws IOException { preCheck(); - return doCheckAndDelete(row, family, qualifier, op.name(), value, delete); + return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete); } @Override public boolean thenMutate(RowMutations mutation) throws IOException { preCheck(); - return doCheckAndMutate(row, family, qualifier, op.name(), value, mutation); + return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index 76208d6842e..d7d11160a78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -48,7 +48,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Public public class Increment extends Mutation { private static final int HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE; - private TimeRange tr = new TimeRange(); + private TimeRange tr = TimeRange.allTime(); /** * Create a Increment operation for the specified row. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index e6f78a1fc8e..d705d7c4f36 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -265,6 +266,8 @@ class RawAsyncTableImpl implements AsyncTable { private byte[] qualifier; + private TimeRange timeRange; + private CompareOperator op; private byte[] value; @@ -281,6 +284,12 @@ class RawAsyncTableImpl implements AsyncTable { return this; } + @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + @Override public CheckAndMutateBuilder ifNotExists() { this.op = CompareOperator.EQUAL; @@ -307,7 +316,7 @@ class RawAsyncTableImpl implements AsyncTable { .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), p), + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), (c, r) -> r.getProcessed())) .call(); } @@ -319,7 +328,7 @@ class RawAsyncTableImpl implements AsyncTable { .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, delete, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), d), + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), (c, r) -> r.getProcessed())) .call(); } @@ -331,7 +340,7 @@ class RawAsyncTableImpl implements AsyncTable { .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), rm), + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), resp -> resp.getExists())) .call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 7139b26da98..20a2adaaeed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -141,7 +141,7 @@ public class Scan extends Query { private long maxResultSize = -1; private boolean cacheBlocks = true; private boolean reversed = false; - private TimeRange tr = new TimeRange(); + private TimeRange tr = TimeRange.allTime(); private Map> familyMap = new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean asyncPrefetch = null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 81513fe9e98..fab439cdb61 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.CompareFilter; @@ -437,6 +438,11 @@ public interface Table extends Closeable { */ CheckAndMutateBuilder qualifier(byte[] qualifier); + /** + * @param timeRange timeRange to check + */ + CheckAndMutateBuilder timeRange(TimeRange timeRange); + /** * Check for lack of column. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index f11fcf6a2de..4610863a780 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -29,7 +29,6 @@ import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; import com.google.protobuf.TextFormat; - import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.Method; @@ -38,10 +37,8 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NavigableSet; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell.Type; @@ -878,20 +875,13 @@ public final class ProtobufUtil { scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand); } scanBuilder.setMaxVersions(scan.getMaxVersions()); - for (Entry cftr : scan.getColumnFamilyTimeRange().entrySet()) { - HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder(); - b.setColumnFamily(ByteStringer.wrap(cftr.getKey())); - b.setTimeRange(timeRangeToProto(cftr.getValue())); - scanBuilder.addCfTimeRange(b); - } - TimeRange timeRange = scan.getTimeRange(); - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - scanBuilder.setTimeRange(timeRangeBuilder.build()); - } + scan.getColumnFamilyTimeRange().forEach((cf, timeRange) -> { + scanBuilder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder() + .setColumnFamily(ByteStringer.wrap(cf)) + .setTimeRange(toTimeRange(timeRange)) + .build()); + }); + scanBuilder.setTimeRange(toTimeRange(scan.getTimeRange())); Map attributes = scan.getAttributesMap(); if (!attributes.isEmpty()) { NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); @@ -1079,20 +1069,12 @@ public final class ProtobufUtil { if (get.getFilter() != null) { builder.setFilter(ProtobufUtil.toFilter(get.getFilter())); } - for (Entry cftr : get.getColumnFamilyTimeRange().entrySet()) { - HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder(); - b.setColumnFamily(ByteStringer.wrap(cftr.getKey())); - b.setTimeRange(timeRangeToProto(cftr.getValue())); - builder.addCfTimeRange(b); - } - TimeRange timeRange = get.getTimeRange(); - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - builder.setTimeRange(timeRangeBuilder.build()); - } + get.getColumnFamilyTimeRange().forEach((cf, timeRange) -> + builder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder() + .setColumnFamily(ByteStringer.wrap(cf)) + .setTimeRange(toTimeRange(timeRange)).build()) + ); + builder.setTimeRange(toTimeRange(get.getTimeRange())); Map attributes = get.getAttributesMap(); if (!attributes.isEmpty()) { NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); @@ -1138,16 +1120,6 @@ public final class ProtobufUtil { return builder.build(); } - static void setTimeRange(final MutationProto.Builder builder, final TimeRange timeRange) { - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - builder.setTimeRange(timeRangeBuilder.build()); - } - } - public static MutationProto toMutation(final MutationType type, final Mutation mutation) throws IOException { return toMutation(type, mutation, HConstants.NO_NONCE); @@ -1179,12 +1151,10 @@ public final class ProtobufUtil { builder.setNonce(nonce); } if (type == MutationType.INCREMENT) { - TimeRange timeRange = ((Increment) mutation).getTimeRange(); - setTimeRange(builder, timeRange); + builder.setTimeRange(toTimeRange(((Increment) mutation).getTimeRange())); } if (type == MutationType.APPEND) { - TimeRange timeRange = ((Append) mutation).getTimeRange(); - setTimeRange(builder, timeRange); + builder.setTimeRange(toTimeRange(((Append) mutation).getTimeRange())); } ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); @@ -1242,10 +1212,10 @@ public final class ProtobufUtil { getMutationBuilderAndSetCommonFields(type, mutation, builder); builder.setAssociatedCellCount(mutation.size()); if (mutation instanceof Increment) { - setTimeRange(builder, ((Increment)mutation).getTimeRange()); + builder.setTimeRange(toTimeRange(((Increment)mutation).getTimeRange())); } if (mutation instanceof Append) { - setTimeRange(builder, ((Append)mutation).getTimeRange()); + builder.setTimeRange(toTimeRange(((Append)mutation).getTimeRange())); } if (nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); @@ -1721,14 +1691,6 @@ public final class ProtobufUtil { codedInput.checkLastTagWas(0); } - private static HBaseProtos.TimeRange.Builder timeRangeToProto(TimeRange timeRange) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - return timeRangeBuilder; - } - private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException { long minStamp = 0; long maxStamp = Long.MAX_VALUE; @@ -1821,4 +1783,13 @@ public final class ProtobufUtil { } return RSGroupInfo; } + + public static HBaseProtos.TimeRange toTimeRange(TimeRange timeRange) { + if (timeRange == null) { + timeRange = TimeRange.allTime(); + } + return HBaseProtos.TimeRange.newBuilder().setFrom(timeRange.getMin()) + .setTo(timeRange.getMax()) + .build(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 564adefcee9..181615a0b30 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -532,13 +532,13 @@ public final class ProtobufUtil { } if (proto.getCfTimeRangeCount() > 0) { for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) { - TimeRange timeRange = protoToTimeRange(cftr.getTimeRange()); + TimeRange timeRange = toTimeRange(cftr.getTimeRange()); get.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(), timeRange.getMin(), timeRange.getMax()); } } if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); get.setTimeRange(timeRange.getMin(), timeRange.getMax()); } if (proto.hasFilter()) { @@ -861,7 +861,7 @@ public final class ProtobufUtil { Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()), Append::add, proto, cellScanner); if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); append.setTimeRange(timeRange.getMin(), timeRange.getMax()); } return append; @@ -881,7 +881,7 @@ public final class ProtobufUtil { Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()), Increment::add, proto, cellScanner); if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); increment.setTimeRange(timeRange.getMin(), timeRange.getMax()); } return increment; @@ -953,7 +953,7 @@ public final class ProtobufUtil { } } if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); get.setTimeRange(timeRange.getMin(), timeRange.getMax()); } for (NameBytesPair attribute : proto.getAttributeList()) { @@ -1017,20 +1017,13 @@ public final class ProtobufUtil { scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand); } scanBuilder.setMaxVersions(scan.getMaxVersions()); - for (Entry cftr : scan.getColumnFamilyTimeRange().entrySet()) { - HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder(); - b.setColumnFamily(UnsafeByteOperations.unsafeWrap(cftr.getKey())); - b.setTimeRange(timeRangeToProto(cftr.getValue())); - scanBuilder.addCfTimeRange(b); - } - TimeRange timeRange = scan.getTimeRange(); - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - scanBuilder.setTimeRange(timeRangeBuilder.build()); - } + scan.getColumnFamilyTimeRange().forEach((cf, timeRange) -> { + scanBuilder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder() + .setColumnFamily(UnsafeByteOperations.unsafeWrap(cf)) + .setTimeRange(toTimeRange(timeRange)) + .build()); + }); + scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(scan.getTimeRange())); Map attributes = scan.getAttributesMap(); if (!attributes.isEmpty()) { NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); @@ -1149,13 +1142,13 @@ public final class ProtobufUtil { } if (proto.getCfTimeRangeCount() > 0) { for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) { - TimeRange timeRange = protoToTimeRange(cftr.getTimeRange()); + TimeRange timeRange = toTimeRange(cftr.getTimeRange()); scan.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(), timeRange.getMin(), timeRange.getMax()); } } if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); scan.setTimeRange(timeRange.getMin(), timeRange.getMax()); } if (proto.hasFilter()) { @@ -1245,20 +1238,13 @@ public final class ProtobufUtil { if (get.getFilter() != null) { builder.setFilter(ProtobufUtil.toFilter(get.getFilter())); } - for (Entry cftr : get.getColumnFamilyTimeRange().entrySet()) { - HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder(); - b.setColumnFamily(UnsafeByteOperations.unsafeWrap(cftr.getKey())); - b.setTimeRange(timeRangeToProto(cftr.getValue())); - builder.addCfTimeRange(b); - } - TimeRange timeRange = get.getTimeRange(); - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - builder.setTimeRange(timeRangeBuilder.build()); - } + get.getColumnFamilyTimeRange().forEach((cf, timeRange) -> { + builder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder() + .setColumnFamily(UnsafeByteOperations.unsafeWrap(cf)) + .setTimeRange(toTimeRange(timeRange)) + .build()); + }); + builder.setTimeRange(ProtobufUtil.toTimeRange(get.getTimeRange())); Map attributes = get.getAttributesMap(); if (!attributes.isEmpty()) { NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); @@ -1303,16 +1289,6 @@ public final class ProtobufUtil { return builder.build(); } - static void setTimeRange(final MutationProto.Builder builder, final TimeRange timeRange) { - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - builder.setTimeRange(timeRangeBuilder.build()); - } - } - public static MutationProto toMutation(final MutationType type, final Mutation mutation) throws IOException { return toMutation(type, mutation, HConstants.NO_NONCE); @@ -1344,12 +1320,10 @@ public final class ProtobufUtil { builder.setNonce(nonce); } if (type == MutationType.INCREMENT) { - TimeRange timeRange = ((Increment) mutation).getTimeRange(); - setTimeRange(builder, timeRange); + builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange())); } if (type == MutationType.APPEND) { - TimeRange timeRange = ((Append) mutation).getTimeRange(); - setTimeRange(builder, timeRange); + builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange())); } ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); @@ -1407,10 +1381,10 @@ public final class ProtobufUtil { getMutationBuilderAndSetCommonFields(type, mutation, builder); builder.setAssociatedCellCount(mutation.size()); if (mutation instanceof Increment) { - setTimeRange(builder, ((Increment)mutation).getTimeRange()); + builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange())); } if (mutation instanceof Append) { - setTimeRange(builder, ((Append)mutation).getTimeRange()); + builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange())); } if (nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); @@ -2757,24 +2731,11 @@ public final class ProtobufUtil { return scList; } - private static HBaseProtos.TimeRange.Builder timeRangeToProto(TimeRange timeRange) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - return timeRangeBuilder; - } - - private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException { - long minStamp = 0; - long maxStamp = Long.MAX_VALUE; - if (timeRange.hasFrom()) { - minStamp = timeRange.getFrom(); - } - if (timeRange.hasTo()) { - maxStamp = timeRange.getTo(); - } - return new TimeRange(minStamp, maxStamp); + public static TimeRange toTimeRange(HBaseProtos.TimeRange timeRange) { + return timeRange == null ? + TimeRange.allTime() : + new TimeRange(timeRange.hasFrom() ? timeRange.getFrom() : 0, + timeRange.hasTo() ? timeRange.getTo() : Long.MAX_VALUE); } /** @@ -3229,4 +3190,13 @@ public final class ProtobufUtil { .setTimeStampsOfLastAppliedOp(rls.getTimeStampsOfLastAppliedOp()) .build(); } + + public static HBaseProtos.TimeRange toTimeRange(TimeRange timeRange) { + if (timeRange == null) { + timeRange = TimeRange.allTime(); + } + return HBaseProtos.TimeRange.newBuilder().setFrom(timeRange.getMin()) + .setTo(timeRange.getMax()) + .build(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 0afcfe13f07..8ce2f1b855e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -235,16 +236,9 @@ public final class RequestConverter { public static MutateRequest buildMutateRequest( final byte[] regionName, final byte[] row, final byte[] family, final byte [] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, final Put put) throws IOException { - MutateRequest.Builder builder = MutateRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - Condition condition = buildCondition( - row, family, qualifier, comparator, compareType); - builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder())); - builder.setCondition(condition); - return builder.build(); + final CompareType compareType, TimeRange timeRange, final Put put) throws IOException { + return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange + , put, MutationType.PUT); } /** @@ -263,19 +257,21 @@ public final class RequestConverter { public static MutateRequest buildMutateRequest( final byte[] regionName, final byte[] row, final byte[] family, final byte [] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, final Delete delete) throws IOException { - MutateRequest.Builder builder = MutateRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - Condition condition = buildCondition( - row, family, qualifier, comparator, compareType); - builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, - MutationProto.newBuilder())); - builder.setCondition(condition); - return builder.build(); + final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException { + return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange + , delete, MutationType.DELETE); } + public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row, + final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator, + final CompareType compareType, TimeRange timeRange, final Mutation mutation, + final MutationType type) throws IOException { + return MutateRequest.newBuilder() + .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)) + .setMutation(ProtobufUtil.toMutation(type, mutation)) + .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange)) + .build(); + } /** * Create a protocol buffer MutateRequest for conditioned row mutations * @@ -289,17 +285,15 @@ public final class RequestConverter { * @return a mutate request * @throws IOException */ - public static ClientProtos.MultiRequest buildMutateRequest( - final byte[] regionName, final byte[] row, final byte[] family, - final byte [] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, final RowMutations rowMutations) throws IOException { + public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName, + final byte[] row, final byte[] family, final byte[] qualifier, + final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange, + final RowMutations rowMutations) throws IOException { RegionAction.Builder builder = getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); builder.setAtomic(true); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); - Condition condition = buildCondition( - row, family, qualifier, comparator, compareType); for (Mutation mutation: rowMutations.getMutations()) { MutationType mutateType = null; if (mutation instanceof Put) { @@ -316,10 +310,9 @@ public final class RequestConverter { actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } - ClientProtos.MultiRequest request = - ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) - .setCondition(condition).build(); - return request; + return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) + .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange)) + .build(); } /** @@ -1100,16 +1093,16 @@ public final class RequestConverter { * @throws IOException */ public static Condition buildCondition(final byte[] row, final byte[] family, - final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType) - throws IOException { - Condition.Builder builder = Condition.newBuilder(); - builder.setRow(UnsafeByteOperations.unsafeWrap(row)); - builder.setFamily(UnsafeByteOperations.unsafeWrap(family)); - builder.setQualifier(UnsafeByteOperations - .unsafeWrap(qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier)); - builder.setComparator(ProtobufUtil.toComparator(comparator)); - builder.setCompareType(compareType); - return builder.build(); + final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType, + final TimeRange timeRange) { + return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row)) + .setFamily(UnsafeByteOperations.unsafeWrap(family)) + .setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ? + HConstants.EMPTY_BYTE_ARRAY : qualifier)) + .setComparator(ProtobufUtil.toComparator(comparator)) + .setCompareType(compareType) + .setTimeRange(ProtobufUtil.toTimeRange(timeRange)) + .build(); } /** diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java index 77c0650ad3e..f16f060eea7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; @@ -110,7 +111,7 @@ public class TestProtobufUtil { getBuilder = ClientProtos.Get.newBuilder(proto); getBuilder.setMaxVersions(1); getBuilder.setCacheBlocks(true); - + getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); Get get = ProtobufUtil.toGet(proto); assertEquals(getBuilder.build(), ProtobufUtil.toGet(get)); } @@ -244,6 +245,7 @@ public class TestProtobufUtil { scanBuilder.setMaxVersions(2); scanBuilder.setCacheBlocks(false); scanBuilder.setCaching(1024); + scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); ClientProtos.Scan expectedProto = scanBuilder.build(); ClientProtos.Scan actualProto = ProtobufUtil.toScan( @@ -305,6 +307,7 @@ public class TestProtobufUtil { Increment increment = ProtobufUtil.toIncrement(proto, null); mutateBuilder.setTimestamp(increment.getTimeStamp()); + mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(increment.getTimeRange())); assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment)); } @@ -345,6 +348,7 @@ public class TestProtobufUtil { // append always use the latest timestamp, // reset the timestamp to the original mutate mutateBuilder.setTimestamp(append.getTimeStamp()); + mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(append.getTimeRange())); assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append)); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java index e4503463571..c44ab699d31 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java @@ -37,6 +37,20 @@ import org.apache.yetus.audience.InterfaceAudience; public class TimeRange { public static final long INITIAL_MIN_TIMESTAMP = 0L; public static final long INITIAL_MAX_TIMESTAMP = Long.MAX_VALUE; + private static final TimeRange ALL_TIME = new TimeRange(INITIAL_MIN_TIMESTAMP, + INITIAL_MAX_TIMESTAMP); + + public static TimeRange allTime() { + return ALL_TIME; + } + + public static TimeRange at(long ts) { + if (ts < 0 || ts == Long.MAX_VALUE) { + throw new IllegalArgumentException("invalid ts:" + ts); + } + return new TimeRange(ts, ts + 1); + } + private final long minStamp; private final long maxStamp; private final boolean allTime; @@ -150,7 +164,10 @@ public class TimeRange { * @param bytes timestamp to check * @param offset offset into the bytes * @return true if within TimeRange, false if not + * @deprecated This is made @InterfaceAudience.Private in the 2.0 line and above and may be + * changed to private or removed in 3.0. Use {@link #withinTimeRange(long)} instead */ + @Deprecated public boolean withinTimeRange(byte [] bytes, int offset) { if (allTime) { return true; diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 325b9c14608..14abb085d6e 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -143,6 +143,7 @@ message Condition { required bytes qualifier = 3; required CompareType compare_type = 4; required Comparator comparator = 5; + optional TimeRange time_range = 6; } diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 817c26edbe6..3681ae9b79f 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -143,6 +143,7 @@ message Condition { required bytes qualifier = 3; required CompareType compare_type = 4; required Comparator comparator = 5; + optional TimeRange time_range = 6; } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 21c78583cae..b8d0035f2b7 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -975,6 +975,11 @@ public class RemoteHTable implements Table { return this; } + @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + throw new UnsupportedOperationException("timeRange not implemented"); + } + @Override public CheckAndMutateBuilder ifNotExists() { throw new UnsupportedOperationException("CheckAndMutate for non-equal comparison " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 732d5e0f94c..f676b0eb255 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4008,28 +4008,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOperator op, ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) - throws IOException{ + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException { checkMutationType(mutation, row); - return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, - mutation); + return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation); } @Override - public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOperator op, ByteArrayComparable comparator, RowMutations rm) - throws IOException { - return doCheckAndRowMutate(row, family, qualifier, op, comparator, rm, null); + public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException { + return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null); } /** * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has * switches in the few places where there is deviation. */ - private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOperator op, ByteArrayComparable comparator, RowMutations rowMutations, - Mutation mutation) + private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, + RowMutations rowMutations, Mutation mutation) throws IOException { // Could do the below checks but seems wacky with two callers only. Just comment out for now. // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't @@ -4044,6 +4041,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Get get = new Get(row); checkFamily(family); get.addColumn(family, qualifier); + if (timeRange != null) { + get.setTimeRange(timeRange.getMin(), timeRange.getMax()); + } // Lock row - note that doBatchMutate will relock this row if called checkRow(row, "doCheckAndRowMutate"); RowLock rowLock = getRowLockInternal(get.getRow(), false, null); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 319684e9216..2793d2d2528 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; @@ -593,9 +594,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. */ private boolean checkAndRowMutate(final HRegion region, final List actions, - final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, - ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { + final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder, + ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { int countOfCompleteMutation = 0; try { if (!region.getRegionInfo().isMetaRegion()) { @@ -638,7 +639,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addResultOrException( resultOrExceptionOrBuilder.build()); } - return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm); + return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); } finally { // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner // even if the malformed cells are not skipped. @@ -2640,9 +2641,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, CompareOperator.valueOf(condition.getCompareType().name()); ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); - processed = checkAndRowMutate(region, regionAction.getActionList(), - cellScanner, row, family, qualifier, op, - comparator, regionActionResultBuilder, spaceQuotaEnforcement); + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); + processed = + checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, + qualifier, op, comparator, timeRange, regionActionResultBuilder, + spaceQuotaEnforcement); } else { doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), cellScanner, spaceQuotaEnforcement); @@ -2763,79 +2768,84 @@ public class RSRpcServices implements HBaseRPCErrorHandler, spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); switch (type) { - case APPEND: - // TODO: this doesn't actually check anything. - r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); - break; - case INCREMENT: - // TODO: this doesn't actually check anything. - r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); - break; - case PUT: - Put put = ProtobufUtil.toPut(mutation, cellScanner); - checkCellSizeLimit(region, put); - // Throws an exception when violated - spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - quota.addMutation(put); - if (request.hasCondition()) { - Condition condition = request.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.getFamily().toByteArray(); - byte[] qualifier = condition.getQualifier().toByteArray(); - CompareOperator compareOp = - CompareOperator.valueOf(condition.getCompareType().name()); - ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); - if (region.getCoprocessorHost() != null) { - processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, - compareOp, comparator, put); - } - if (processed == null) { - boolean result = region.checkAndMutate(row, family, - qualifier, compareOp, comparator, put, true); + case APPEND: + // TODO: this doesn't actually check anything. + r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); + break; + case INCREMENT: + // TODO: this doesn't actually check anything. + r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); + break; + case PUT: + Put put = ProtobufUtil.toPut(mutation, cellScanner); + checkCellSizeLimit(region, put); + // Throws an exception when violated + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + quota.addMutation(put); + if (request.hasCondition()) { + Condition condition = request.getCondition(); + byte[] row = condition.getRow().toByteArray(); + byte[] family = condition.getFamily().toByteArray(); + byte[] qualifier = condition.getQualifier().toByteArray(); + CompareOperator compareOp = + CompareOperator.valueOf(condition.getCompareType().name()); + ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); if (region.getCoprocessorHost() != null) { - result = region.getCoprocessorHost().postCheckAndPut(row, family, - qualifier, compareOp, comparator, put, result); + processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, + compareOp, comparator, put); } - processed = result; + if (processed == null) { + boolean result = region.checkAndMutate(row, family, + qualifier, compareOp, comparator, timeRange, put); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postCheckAndPut(row, family, + qualifier, compareOp, comparator, put, result); + } + processed = result; + } + } else { + region.put(put); + processed = Boolean.TRUE; } - } else { - region.put(put); - processed = Boolean.TRUE; - } - break; - case DELETE: - Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); - checkCellSizeLimit(region, delete); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); - quota.addMutation(delete); - if (request.hasCondition()) { - Condition condition = request.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.getFamily().toByteArray(); - byte[] qualifier = condition.getQualifier().toByteArray(); - CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); - ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); - if (region.getCoprocessorHost() != null) { - processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, - comparator, delete); - } - if (processed == null) { - boolean result = region.checkAndMutate(row, family, - qualifier, op, comparator, delete, true); + break; + case DELETE: + Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); + checkCellSizeLimit(region, delete); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); + quota.addMutation(delete); + if (request.hasCondition()) { + Condition condition = request.getCondition(); + byte[] row = condition.getRow().toByteArray(); + byte[] family = condition.getFamily().toByteArray(); + byte[] qualifier = condition.getQualifier().toByteArray(); + CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); + ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); if (region.getCoprocessorHost() != null) { - result = region.getCoprocessorHost().postCheckAndDelete(row, family, - qualifier, op, comparator, delete, result); + processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, + comparator, delete); } - processed = result; + if (processed == null) { + boolean result = region.checkAndMutate(row, family, + qualifier, op, comparator, timeRange, delete); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postCheckAndDelete(row, family, + qualifier, op, comparator, delete, result); + } + processed = result; + } + } else { + region.delete(delete); + processed = Boolean.TRUE; } - } else { - region.delete(delete); - processed = Boolean.TRUE; - } - break; - default: - throw new DoNotRetryIOException( - "Unsupported mutate type: " + type.name()); + break; + default: + throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); } if (processed != null) { builder.setProcessed(processed.booleanValue()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 27771ceb1e4..80b18b8ae2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -303,14 +304,31 @@ public interface Region extends ConfigurationObserver { * @param family column family to check * @param qualifier column qualifier to check * @param op the comparison operator - * @param comparator - * @param mutation - * @param writeToWAL + * @param comparator the expected value + * @param mutation data to put if check succeeds + * @return true if mutation was applied, false otherwise + */ + default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, + ByteArrayComparable comparator, Mutation mutation) throws IOException { + return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value and if it does, + * it performs the mutation. If the passed value is null, the lack of column value + * (ie: non-existence) is used. See checkAndRowMutate to do many checkAndPuts at a time on a + * single row. + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param op the comparison operator + * @param comparator the expected value + * @param mutation data to put if check succeeds + * @param timeRange time range to check * @return true if mutation was applied, false otherwise - * @throws IOException */ boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, - ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException; + ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException; /** * Atomically checks if a row/family/qualifier value matches the expected values and if it does, @@ -321,13 +339,32 @@ public interface Region extends ConfigurationObserver { * @param family column family to check * @param qualifier column qualifier to check * @param op the comparison operator - * @param comparator - * @param mutations + * @param comparator the expected value + * @param mutations data to put if check succeeds + * @return true if mutations were applied, false otherwise + */ + default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + ByteArrayComparable comparator, RowMutations mutations) throws IOException { + return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), + mutations); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected values and if it does, + * it performs the row mutations. If the passed value is null, the lack of column value + * (ie: non-existence) is used. Use to do many mutations on a single row. Use checkAndMutate + * to do one checkAndMutate at a time. + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param op the comparison operator + * @param comparator the expected value + * @param mutations data to put if check succeeds + * @param timeRange time range to check * @return true if mutations were applied, false otherwise - * @throws IOException */ boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, - ByteArrayComparable comparator, RowMutations mutations) + ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations) throws IOException; /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 37182ecd290..576c0a728c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -40,6 +40,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -338,4 +339,66 @@ public class TestAsyncTable { } }); } + + @Test + public void testCheckAndMutateWithTimeRange() throws Exception { + TEST_UTIL.createTable(TableName.valueOf("testCheckAndMutateWithTimeRange"), FAMILY); + AsyncTable table = getTable.get(); + final long ts = System.currentTimeMillis() / 2; + Put put = new Put(row); + put.addColumn(FAMILY, QUALIFIER, ts, VALUE); + + boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .ifNotExists() + .thenPut(put) + .get(); + assertTrue(ok); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenPut(put) + .get(); + assertFalse(ok); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenPut(put) + .get(); + assertTrue(ok); + + RowMutations rm = new RowMutations(row) + .add((Mutation) put); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenMutate(rm) + .get(); + assertFalse(ok); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenMutate(rm) + .get(); + assertTrue(ok); + + Delete delete = new Delete(row) + .addColumn(FAMILY, QUALIFIER); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenDelete(delete) + .get(); + assertFalse(ok); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenDelete(delete) + .get(); + assertTrue(ok); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 29d3439ccf2..5fba1015c8b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -4831,6 +4832,60 @@ public class TestFromClientSide { } + @Test + public void testCheckAndMutateWithTimeRange() throws IOException { + Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY); + final long ts = System.currentTimeMillis() / 2; + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, ts, VALUE); + + boolean ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .ifNotExists() + .thenPut(put); + assertTrue(ok); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenPut(put); + assertFalse(ok); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenPut(put); + assertTrue(ok); + + RowMutations rm = new RowMutations(ROW) + .add((Mutation) put); + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenMutate(rm); + assertFalse(ok); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenMutate(rm); + assertTrue(ok); + + Delete delete = new Delete(ROW) + .addColumn(FAMILY, QUALIFIER); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenDelete(delete); + assertFalse(ok); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenDelete(delete); + assertTrue(ok); + } + @Test public void testCheckAndPutWithCompareOp() throws IOException { final byte [] value1 = Bytes.toBytes("aaaa"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java index 6305fa1412e..ef4ca253f4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -239,7 +239,7 @@ public class TestMalformedCellFromClient { ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); ClientProtos.Condition condition = RequestConverter .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]), - HBaseProtos.CompareType.EQUAL); + HBaseProtos.CompareType.EQUAL, null); for (Mutation mutation : rm.getMutations()) { ClientProtos.MutationProto.MutationType mutateType = null; if (mutation instanceof Put) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java index 536af716686..7f45e404e9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column; @@ -104,6 +105,7 @@ public class TestProtobufUtil { getBuilder = ClientProtos.Get.newBuilder(proto); getBuilder.setMaxVersions(1); getBuilder.setCacheBlocks(true); + getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); Get get = ProtobufUtil.toGet(proto); assertEquals(getBuilder.build(), ProtobufUtil.toGet(get)); @@ -146,6 +148,7 @@ public class TestProtobufUtil { // append always use the latest timestamp, // reset the timestamp to the original mutate mutateBuilder.setTimestamp(append.getTimeStamp()); + mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(append.getTimeRange())); assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append)); } @@ -229,6 +232,7 @@ public class TestProtobufUtil { Increment increment = ProtobufUtil.toIncrement(proto, null); mutateBuilder.setTimestamp(increment.getTimeStamp()); + mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(increment.getTimeRange())); assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment)); } @@ -314,6 +318,7 @@ public class TestProtobufUtil { scanBuilder.setMaxVersions(2); scanBuilder.setCacheBlocks(false); scanBuilder.setCaching(1024); + scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); ClientProtos.Scan expectedProto = scanBuilder.build(); ClientProtos.Scan actualProto = ProtobufUtil.toScan( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index b14c94f2541..3962bbe3858 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -663,7 +663,7 @@ public class TestAtomicOperation { } testStep = TestStep.CHECKANDPUT_STARTED; region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), - CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true); + CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put); testStep = TestStep.CHECKANDPUT_COMPLETED; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 3272afa34a3..35266896df4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1738,7 +1738,7 @@ public class TestHRegion { // checkAndPut with empty value boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( - emptyVal), put, true); + emptyVal), put); assertTrue(res); // Putting data in key @@ -1747,25 +1747,25 @@ public class TestHRegion { // checkAndPut with correct value res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), - put, true); + put); assertTrue(res); // not empty anymore res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), - put, true); + put); assertFalse(res); Delete delete = new Delete(row1); delete.addColumn(fam1, qf1); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), - delete, true); + delete); assertFalse(res); put = new Put(row1); put.addColumn(fam1, qf1, val2); // checkAndPut with correct value res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), - put, true); + put); assertTrue(res); // checkAndDelete with correct value @@ -1773,12 +1773,12 @@ public class TestHRegion { delete.addColumn(fam1, qf1); delete.addColumn(fam1, qf1); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2), - delete, true); + delete); assertTrue(res); delete = new Delete(row1); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), - delete, true); + delete); assertTrue(res); // checkAndPut looking for a null value @@ -1786,7 +1786,7 @@ public class TestHRegion { put.addColumn(fam1, qf1, val1); res = region - .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put, true); + .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put); assertTrue(res); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); @@ -1814,14 +1814,14 @@ public class TestHRegion { // checkAndPut with wrong value boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( - val2), put, true); + val2), put); assertEquals(false, res); // checkAndDelete with wrong value Delete delete = new Delete(row1); delete.addFamily(fam1); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2), - put, true); + put); assertEquals(false, res); // Putting data in key @@ -1832,7 +1832,7 @@ public class TestHRegion { // checkAndPut with wrong value res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( - bd2), put, true); + bd2), put); assertEquals(false, res); // checkAndDelete with wrong value @@ -1840,7 +1840,7 @@ public class TestHRegion { delete.addFamily(fam1); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( - bd2), put, true); + bd2), put); assertEquals(false, res); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); @@ -1866,14 +1866,14 @@ public class TestHRegion { // checkAndPut with correct value boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( - val1), put, true); + val1), put); assertEquals(true, res); // checkAndDelete with correct value Delete delete = new Delete(row1); delete.addColumn(fam1, qf1); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), - delete, true); + delete); assertEquals(true, res); // Putting data in key @@ -1884,7 +1884,7 @@ public class TestHRegion { // checkAndPut with correct value res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( - bd1), put, true); + bd1), put); assertEquals(true, res); // checkAndDelete with correct value @@ -1892,7 +1892,7 @@ public class TestHRegion { delete.addColumn(fam1, qf1); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator( - bd1), delete, true); + bd1), delete); assertEquals(true, res); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); @@ -1920,12 +1920,12 @@ public class TestHRegion { // Test CompareOp.LESS: original = val3, compare with val3, fail boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, - new BinaryComparator(val3), put, true); + new BinaryComparator(val3), put); assertEquals(false, res); // Test CompareOp.LESS: original = val3, compare with val4, fail res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, - new BinaryComparator(val4), put, true); + new BinaryComparator(val4), put); assertEquals(false, res); // Test CompareOp.LESS: original = val3, compare with val2, @@ -1933,18 +1933,18 @@ public class TestHRegion { put = new Put(row1); put.addColumn(fam1, qf1, val2); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS, - new BinaryComparator(val2), put, true); + new BinaryComparator(val2), put); assertEquals(true, res); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, - new BinaryComparator(val3), put, true); + new BinaryComparator(val3), put); assertEquals(false, res); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2, // succeed (value still = val2) res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, - new BinaryComparator(val2), put, true); + new BinaryComparator(val2), put); assertEquals(true, res); // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1, @@ -1952,17 +1952,17 @@ public class TestHRegion { put = new Put(row1); put.addColumn(fam1, qf1, val3); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL, - new BinaryComparator(val1), put, true); + new BinaryComparator(val1), put); assertEquals(true, res); // Test CompareOp.GREATER: original = val3, compare with val3, fail res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, - new BinaryComparator(val3), put, true); + new BinaryComparator(val3), put); assertEquals(false, res); // Test CompareOp.GREATER: original = val3, compare with val2, fail res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, - new BinaryComparator(val2), put, true); + new BinaryComparator(val2), put); assertEquals(false, res); // Test CompareOp.GREATER: original = val3, compare with val4, @@ -1970,23 +1970,23 @@ public class TestHRegion { put = new Put(row1); put.addColumn(fam1, qf1, val2); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER, - new BinaryComparator(val4), put, true); + new BinaryComparator(val4), put); assertEquals(true, res); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, - new BinaryComparator(val1), put, true); + new BinaryComparator(val1), put); assertEquals(false, res); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2, // succeed (value still = val2) res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, - new BinaryComparator(val2), put, true); + new BinaryComparator(val2), put); assertEquals(true, res); // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL, - new BinaryComparator(val3), put, true); + new BinaryComparator(val3), put); assertEquals(true, res); } finally { HBaseTestingUtility.closeRegionAndWAL(this.region); @@ -2021,7 +2021,7 @@ public class TestHRegion { // checkAndPut with wrong value boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( - val1), put, true); + val1), put); assertEquals(true, res); Get get = new Get(row1); @@ -2048,7 +2048,7 @@ public class TestHRegion { put.addColumn(fam1, qual1, value1); try { region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, - new BinaryComparator(value2), put, false); + new BinaryComparator(value2), put); fail(); } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) { // expected exception. @@ -2097,7 +2097,7 @@ public class TestHRegion { delete.addColumn(fam2, qf1); delete.addColumn(fam1, qf3); boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator( - val2), delete, true); + val2), delete); assertEquals(true, res); Get get = new Get(row1); @@ -2113,7 +2113,7 @@ public class TestHRegion { delete = new Delete(row1); delete.addFamily(fam2); res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal), - delete, true); + delete); assertEquals(true, res); get = new Get(row1); @@ -2124,7 +2124,7 @@ public class TestHRegion { // Row delete delete = new Delete(row1); res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1), - delete, true); + delete); assertEquals(true, res); get = new Get(row1); r = region.get(get); @@ -6260,7 +6260,7 @@ public class TestHRegion { p = new Put(row); p.setDurability(Durability.SKIP_WAL); p.addColumn(fam1, qual1, qual2); - region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p, false); + region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p); result = region.get(new Get(row)); c = result.getColumnLatestCell(fam1, qual1); assertEquals(10L, c.getTimestamp()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSimpleTimeRangeTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSimpleTimeRangeTracker.java index 6aa676848b8..a32fbdf6bde 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSimpleTimeRangeTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSimpleTimeRangeTracker.java @@ -50,13 +50,13 @@ public class TestSimpleTimeRangeTracker { @Test public void testExtreme() { - TimeRange tr = new TimeRange(); - assertTrue(tr.includesTimeRange(new TimeRange())); + TimeRange tr = TimeRange.allTime(); + assertTrue(tr.includesTimeRange(TimeRange.allTime())); TimeRangeTracker trt = getTimeRangeTracker(); - assertFalse(trt.includesTimeRange(new TimeRange())); + assertFalse(trt.includesTimeRange(TimeRange.allTime())); trt.includeTimestamp(1); trt.includeTimestamp(10); - assertTrue(trt.includesTimeRange(new TimeRange())); + assertTrue(trt.includesTimeRange(TimeRange.allTime())); } @Test @@ -114,7 +114,7 @@ public class TestSimpleTimeRangeTracker { @Test public void testRangeConstruction() throws IOException { - TimeRange defaultRange = new TimeRange(); + TimeRange defaultRange = TimeRange.allTime(); assertEquals(0L, defaultRange.getMin()); assertEquals(Long.MAX_VALUE, defaultRange.getMax()); assertTrue(defaultRange.isAllTime());