HBASE-19504 Add TimeRange support into checkAndMutate
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
489e875a78
commit
6aba045aae
|
@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
|
||||||
public class Append extends Mutation {
|
public class Append extends Mutation {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Append.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Append.class);
|
||||||
private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
|
private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
|
||||||
private TimeRange tr = new TimeRange();
|
private TimeRange tr = TimeRange.allTime();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the TimeRange to be used on the Get for this append.
|
* Sets the TimeRange to be used on the Get for this append.
|
||||||
|
|
|
@ -22,15 +22,14 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
|
||||||
|
|
||||||
import com.google.protobuf.RpcChannel;
|
import com.google.protobuf.RpcChannel;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompareOperator;
|
import org.apache.hadoop.hbase.CompareOperator;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@ -235,6 +234,11 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
|
||||||
*/
|
*/
|
||||||
CheckAndMutateBuilder qualifier(byte[] qualifier);
|
CheckAndMutateBuilder qualifier(byte[] qualifier);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param timeRange time range to check.
|
||||||
|
*/
|
||||||
|
CheckAndMutateBuilder timeRange(TimeRange timeRange);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check for lack of column.
|
* Check for lack of column.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -20,17 +20,16 @@ package org.apache.hadoop.hbase.client;
|
||||||
import static java.util.stream.Collectors.toList;
|
import static java.util.stream.Collectors.toList;
|
||||||
|
|
||||||
import com.google.protobuf.RpcChannel;
|
import com.google.protobuf.RpcChannel;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompareOperator;
|
import org.apache.hadoop.hbase.CompareOperator;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -151,6 +150,12 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
|
||||||
|
builder.timeRange(timeRange);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CheckAndMutateBuilder ifNotExists() {
|
public CheckAndMutateBuilder ifNotExists() {
|
||||||
builder.ifNotExists();
|
builder.ifNotExists();
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class Get extends Query implements Row {
|
||||||
private boolean cacheBlocks = true;
|
private boolean cacheBlocks = true;
|
||||||
private int storeLimit = -1;
|
private int storeLimit = -1;
|
||||||
private int storeOffset = 0;
|
private int storeOffset = 0;
|
||||||
private TimeRange tr = new TimeRange();
|
private TimeRange tr = TimeRange.allTime();
|
||||||
private boolean checkExistenceOnly = false;
|
private boolean checkExistenceOnly = false;
|
||||||
private boolean closestRowBefore = false;
|
private boolean closestRowBefore = false;
|
||||||
private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -692,14 +693,14 @@ public class HTable implements Table {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||||
final byte [] value, final Put put) throws IOException {
|
final byte [] value, final Put put) throws IOException {
|
||||||
return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, put);
|
return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||||
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
|
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
|
||||||
return doCheckAndPut(row, family, qualifier, compareOp.name(), value, put);
|
return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -708,11 +709,12 @@ public class HTable implements Table {
|
||||||
final CompareOperator op, final byte [] value, final Put put) throws IOException {
|
final CompareOperator op, final byte [] value, final Put put) throws IOException {
|
||||||
// The name of the operators in CompareOperator are intentionally those of the
|
// The name of the operators in CompareOperator are intentionally those of the
|
||||||
// operators in the filter's CompareOp enum.
|
// operators in the filter's CompareOp enum.
|
||||||
return doCheckAndPut(row, family, qualifier, op.name(), value, put);
|
return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean doCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
|
private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||||
final String opName, final byte [] value, final Put put) throws IOException {
|
final String opName, final byte[] value, final TimeRange timeRange, final Put put)
|
||||||
|
throws IOException {
|
||||||
ClientServiceCallable<Boolean> callable =
|
ClientServiceCallable<Boolean> callable =
|
||||||
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
|
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
|
||||||
this.rpcControllerFactory.newController(), put.getPriority()) {
|
this.rpcControllerFactory.newController(), put.getPriority()) {
|
||||||
|
@ -721,7 +723,7 @@ public class HTable implements Table {
|
||||||
CompareType compareType = CompareType.valueOf(opName);
|
CompareType compareType = CompareType.valueOf(opName);
|
||||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
MutateRequest request = RequestConverter.buildMutateRequest(
|
||||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
||||||
new BinaryComparator(value), compareType, put);
|
new BinaryComparator(value), compareType, timeRange, put);
|
||||||
MutateResponse response = doMutate(request);
|
MutateResponse response = doMutate(request);
|
||||||
return Boolean.valueOf(response.getProcessed());
|
return Boolean.valueOf(response.getProcessed());
|
||||||
}
|
}
|
||||||
|
@ -732,60 +734,58 @@ public class HTable implements Table {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
|
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||||
final byte [] value, final Delete delete) throws IOException {
|
final byte[] value, final Delete delete) throws IOException {
|
||||||
return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, delete);
|
return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
|
||||||
|
delete);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
|
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||||
final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException {
|
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
|
||||||
return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, delete);
|
return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
|
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||||
final CompareOperator op, final byte [] value, final Delete delete) throws IOException {
|
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
|
||||||
return doCheckAndDelete(row, family, qualifier, op.name(), value, delete);
|
return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean doCheckAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
|
private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||||
final String opName, final byte [] value, final Delete delete) throws IOException {
|
final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
|
||||||
|
throws IOException {
|
||||||
CancellableRegionServerCallable<SingleResponse> callable =
|
CancellableRegionServerCallable<SingleResponse> callable =
|
||||||
new CancellableRegionServerCallable<SingleResponse>(
|
new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
|
||||||
this.connection, getName(), row, this.rpcControllerFactory.newController(),
|
this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
|
||||||
writeRpcTimeoutMs, new RetryingTimeTracker().start(), delete.getPriority()) {
|
new RetryingTimeTracker().start(), delete.getPriority()) {
|
||||||
@Override
|
@Override
|
||||||
protected SingleResponse rpcCall() throws Exception {
|
protected SingleResponse rpcCall() throws Exception {
|
||||||
CompareType compareType = CompareType.valueOf(opName);
|
CompareType compareType = CompareType.valueOf(opName);
|
||||||
MutateRequest request = RequestConverter.buildMutateRequest(
|
MutateRequest request = RequestConverter
|
||||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
|
||||||
new BinaryComparator(value), compareType, delete);
|
qualifier, new BinaryComparator(value), compareType, timeRange, delete);
|
||||||
MutateResponse response = doMutate(request);
|
MutateResponse response = doMutate(request);
|
||||||
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
|
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
List<Delete> rows = Collections.singletonList(delete);
|
List<Delete> rows = Collections.singletonList(delete);
|
||||||
Object[] results = new Object[1];
|
Object[] results = new Object[1];
|
||||||
AsyncProcessTask task = AsyncProcessTask.newBuilder()
|
AsyncProcessTask task =
|
||||||
.setPool(pool)
|
AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
|
||||||
.setTableName(tableName)
|
|
||||||
.setRowAccess(rows)
|
|
||||||
.setCallable(callable)
|
.setCallable(callable)
|
||||||
// TODO any better timeout?
|
// TODO any better timeout?
|
||||||
.setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
|
.setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
|
||||||
.setOperationTimeout(operationTimeoutMs)
|
.setOperationTimeout(operationTimeoutMs)
|
||||||
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
|
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
|
||||||
.setResults(results)
|
|
||||||
.build();
|
|
||||||
AsyncRequestFuture ars = multiAp.submit(task);
|
AsyncRequestFuture ars = multiAp.submit(task);
|
||||||
ars.waitUntilDone();
|
ars.waitUntilDone();
|
||||||
if (ars.hasError()) {
|
if (ars.hasError()) {
|
||||||
throw ars.getErrors();
|
throw ars.getErrors();
|
||||||
}
|
}
|
||||||
return ((SingleResponse.Entry)results[0]).isProcessed();
|
return ((SingleResponse.Entry) results[0]).isProcessed();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -793,9 +793,9 @@ public class HTable implements Table {
|
||||||
return new CheckAndMutateBuilderImpl(row, family);
|
return new CheckAndMutateBuilderImpl(row, family);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean doCheckAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
|
||||||
final String opName, final byte [] value, final RowMutations rm)
|
final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
CancellableRegionServerCallable<MultiResponse> callable =
|
CancellableRegionServerCallable<MultiResponse> callable =
|
||||||
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
|
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
|
||||||
rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
|
rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
|
||||||
|
@ -803,18 +803,18 @@ public class HTable implements Table {
|
||||||
@Override
|
@Override
|
||||||
protected MultiResponse rpcCall() throws Exception {
|
protected MultiResponse rpcCall() throws Exception {
|
||||||
CompareType compareType = CompareType.valueOf(opName);
|
CompareType compareType = CompareType.valueOf(opName);
|
||||||
MultiRequest request = RequestConverter.buildMutateRequest(
|
MultiRequest request = RequestConverter
|
||||||
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
|
||||||
new BinaryComparator(value), compareType, rm);
|
new BinaryComparator(value), compareType, timeRange, rm);
|
||||||
ClientProtos.MultiResponse response = doMulti(request);
|
ClientProtos.MultiResponse response = doMulti(request);
|
||||||
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
|
||||||
if (res.hasException()) {
|
if (res.hasException()) {
|
||||||
Throwable ex = ProtobufUtil.toException(res.getException());
|
Throwable ex = ProtobufUtil.toException(res.getException());
|
||||||
if (ex instanceof IOException) {
|
if (ex instanceof IOException) {
|
||||||
throw (IOException)ex;
|
throw (IOException) ex;
|
||||||
}
|
}
|
||||||
throw new IOException("Failed to checkAndMutate row: "+
|
throw new IOException(
|
||||||
Bytes.toStringBinary(rm.getRow()), ex);
|
"Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
|
||||||
}
|
}
|
||||||
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
|
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
|
||||||
}
|
}
|
||||||
|
@ -850,14 +850,14 @@ public class HTable implements Table {
|
||||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||||
final CompareOp compareOp, final byte [] value, final RowMutations rm)
|
final CompareOp compareOp, final byte [] value, final RowMutations rm)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, rm);
|
return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
|
||||||
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
|
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
|
||||||
return doCheckAndMutate(row, family, qualifier, op.name(), value, rm);
|
return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1234,6 +1234,7 @@ public class HTable implements Table {
|
||||||
private final byte[] row;
|
private final byte[] row;
|
||||||
private final byte[] family;
|
private final byte[] family;
|
||||||
private byte[] qualifier;
|
private byte[] qualifier;
|
||||||
|
private TimeRange timeRange;
|
||||||
private CompareOperator op;
|
private CompareOperator op;
|
||||||
private byte[] value;
|
private byte[] value;
|
||||||
|
|
||||||
|
@ -1249,6 +1250,12 @@ public class HTable implements Table {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
|
||||||
|
this.timeRange = timeRange;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CheckAndMutateBuilder ifNotExists() {
|
public CheckAndMutateBuilder ifNotExists() {
|
||||||
this.op = CompareOperator.EQUAL;
|
this.op = CompareOperator.EQUAL;
|
||||||
|
@ -1271,19 +1278,19 @@ public class HTable implements Table {
|
||||||
@Override
|
@Override
|
||||||
public boolean thenPut(Put put) throws IOException {
|
public boolean thenPut(Put put) throws IOException {
|
||||||
preCheck();
|
preCheck();
|
||||||
return doCheckAndPut(row, family, qualifier, op.name(), value, put);
|
return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean thenDelete(Delete delete) throws IOException {
|
public boolean thenDelete(Delete delete) throws IOException {
|
||||||
preCheck();
|
preCheck();
|
||||||
return doCheckAndDelete(row, family, qualifier, op.name(), value, delete);
|
return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean thenMutate(RowMutations mutation) throws IOException {
|
public boolean thenMutate(RowMutations mutation) throws IOException {
|
||||||
preCheck();
|
preCheck();
|
||||||
return doCheckAndMutate(row, family, qualifier, op.name(), value, mutation);
|
return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
public class Increment extends Mutation {
|
public class Increment extends Mutation {
|
||||||
private static final int HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
|
private static final int HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
|
||||||
private TimeRange tr = new TimeRange();
|
private TimeRange tr = TimeRange.allTime();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a Increment operation for the specified row.
|
* Create a Increment operation for the specified row.
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
||||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
|
@ -265,6 +266,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
|
|
||||||
private byte[] qualifier;
|
private byte[] qualifier;
|
||||||
|
|
||||||
|
private TimeRange timeRange;
|
||||||
|
|
||||||
private CompareOperator op;
|
private CompareOperator op;
|
||||||
|
|
||||||
private byte[] value;
|
private byte[] value;
|
||||||
|
@ -281,6 +284,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
|
||||||
|
this.timeRange = timeRange;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CheckAndMutateBuilder ifNotExists() {
|
public CheckAndMutateBuilder ifNotExists() {
|
||||||
this.op = CompareOperator.EQUAL;
|
this.op = CompareOperator.EQUAL;
|
||||||
|
@ -307,7 +316,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
|
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
|
||||||
loc, stub, put,
|
loc, stub, put,
|
||||||
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||||
new BinaryComparator(value), CompareType.valueOf(op.name()), p),
|
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
|
||||||
(c, r) -> r.getProcessed()))
|
(c, r) -> r.getProcessed()))
|
||||||
.call();
|
.call();
|
||||||
}
|
}
|
||||||
|
@ -319,7 +328,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
|
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
|
||||||
loc, stub, delete,
|
loc, stub, delete,
|
||||||
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||||
new BinaryComparator(value), CompareType.valueOf(op.name()), d),
|
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
|
||||||
(c, r) -> r.getProcessed()))
|
(c, r) -> r.getProcessed()))
|
||||||
.call();
|
.call();
|
||||||
}
|
}
|
||||||
|
@ -331,7 +340,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
|
||||||
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
|
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
|
||||||
stub, mutation,
|
stub, mutation,
|
||||||
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
|
||||||
new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
|
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
|
||||||
resp -> resp.getExists()))
|
resp -> resp.getExists()))
|
||||||
.call();
|
.call();
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,7 @@ public class Scan extends Query {
|
||||||
private long maxResultSize = -1;
|
private long maxResultSize = -1;
|
||||||
private boolean cacheBlocks = true;
|
private boolean cacheBlocks = true;
|
||||||
private boolean reversed = false;
|
private boolean reversed = false;
|
||||||
private TimeRange tr = new TimeRange();
|
private TimeRange tr = TimeRange.allTime();
|
||||||
private Map<byte [], NavigableSet<byte []>> familyMap =
|
private Map<byte [], NavigableSet<byte []>> familyMap =
|
||||||
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
|
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
|
||||||
private Boolean asyncPrefetch = null;
|
private Boolean asyncPrefetch = null;
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompareOperator;
|
import org.apache.hadoop.hbase.CompareOperator;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||||
|
@ -437,6 +438,11 @@ public interface Table extends Closeable {
|
||||||
*/
|
*/
|
||||||
CheckAndMutateBuilder qualifier(byte[] qualifier);
|
CheckAndMutateBuilder qualifier(byte[] qualifier);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param timeRange timeRange to check
|
||||||
|
*/
|
||||||
|
CheckAndMutateBuilder timeRange(TimeRange timeRange);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check for lack of column.
|
* Check for lack of column.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -29,7 +29,6 @@ import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.Service;
|
import com.google.protobuf.Service;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
@ -38,10 +37,8 @@ import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.Cell.Type;
|
import org.apache.hadoop.hbase.Cell.Type;
|
||||||
|
@ -878,20 +875,13 @@ public final class ProtobufUtil {
|
||||||
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
|
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
|
||||||
}
|
}
|
||||||
scanBuilder.setMaxVersions(scan.getMaxVersions());
|
scanBuilder.setMaxVersions(scan.getMaxVersions());
|
||||||
for (Entry<byte[], TimeRange> cftr : scan.getColumnFamilyTimeRange().entrySet()) {
|
scan.getColumnFamilyTimeRange().forEach((cf, timeRange) -> {
|
||||||
HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder();
|
scanBuilder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder()
|
||||||
b.setColumnFamily(ByteStringer.wrap(cftr.getKey()));
|
.setColumnFamily(ByteStringer.wrap(cf))
|
||||||
b.setTimeRange(timeRangeToProto(cftr.getValue()));
|
.setTimeRange(toTimeRange(timeRange))
|
||||||
scanBuilder.addCfTimeRange(b);
|
.build());
|
||||||
}
|
});
|
||||||
TimeRange timeRange = scan.getTimeRange();
|
scanBuilder.setTimeRange(toTimeRange(scan.getTimeRange()));
|
||||||
if (!timeRange.isAllTime()) {
|
|
||||||
HBaseProtos.TimeRange.Builder timeRangeBuilder =
|
|
||||||
HBaseProtos.TimeRange.newBuilder();
|
|
||||||
timeRangeBuilder.setFrom(timeRange.getMin());
|
|
||||||
timeRangeBuilder.setTo(timeRange.getMax());
|
|
||||||
scanBuilder.setTimeRange(timeRangeBuilder.build());
|
|
||||||
}
|
|
||||||
Map<String, byte[]> attributes = scan.getAttributesMap();
|
Map<String, byte[]> attributes = scan.getAttributesMap();
|
||||||
if (!attributes.isEmpty()) {
|
if (!attributes.isEmpty()) {
|
||||||
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
|
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
|
||||||
|
@ -1079,20 +1069,12 @@ public final class ProtobufUtil {
|
||||||
if (get.getFilter() != null) {
|
if (get.getFilter() != null) {
|
||||||
builder.setFilter(ProtobufUtil.toFilter(get.getFilter()));
|
builder.setFilter(ProtobufUtil.toFilter(get.getFilter()));
|
||||||
}
|
}
|
||||||
for (Entry<byte[], TimeRange> cftr : get.getColumnFamilyTimeRange().entrySet()) {
|
get.getColumnFamilyTimeRange().forEach((cf, timeRange) ->
|
||||||
HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder();
|
builder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder()
|
||||||
b.setColumnFamily(ByteStringer.wrap(cftr.getKey()));
|
.setColumnFamily(ByteStringer.wrap(cf))
|
||||||
b.setTimeRange(timeRangeToProto(cftr.getValue()));
|
.setTimeRange(toTimeRange(timeRange)).build())
|
||||||
builder.addCfTimeRange(b);
|
);
|
||||||
}
|
builder.setTimeRange(toTimeRange(get.getTimeRange()));
|
||||||
TimeRange timeRange = get.getTimeRange();
|
|
||||||
if (!timeRange.isAllTime()) {
|
|
||||||
HBaseProtos.TimeRange.Builder timeRangeBuilder =
|
|
||||||
HBaseProtos.TimeRange.newBuilder();
|
|
||||||
timeRangeBuilder.setFrom(timeRange.getMin());
|
|
||||||
timeRangeBuilder.setTo(timeRange.getMax());
|
|
||||||
builder.setTimeRange(timeRangeBuilder.build());
|
|
||||||
}
|
|
||||||
Map<String, byte[]> attributes = get.getAttributesMap();
|
Map<String, byte[]> attributes = get.getAttributesMap();
|
||||||
if (!attributes.isEmpty()) {
|
if (!attributes.isEmpty()) {
|
||||||
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
|
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
|
||||||
|
@ -1138,16 +1120,6 @@ public final class ProtobufUtil {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setTimeRange(final MutationProto.Builder builder, final TimeRange timeRange) {
|
|
||||||
if (!timeRange.isAllTime()) {
|
|
||||||
HBaseProtos.TimeRange.Builder timeRangeBuilder =
|
|
||||||
HBaseProtos.TimeRange.newBuilder();
|
|
||||||
timeRangeBuilder.setFrom(timeRange.getMin());
|
|
||||||
timeRangeBuilder.setTo(timeRange.getMax());
|
|
||||||
builder.setTimeRange(timeRangeBuilder.build());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
|
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return toMutation(type, mutation, HConstants.NO_NONCE);
|
return toMutation(type, mutation, HConstants.NO_NONCE);
|
||||||
|
@ -1179,12 +1151,10 @@ public final class ProtobufUtil {
|
||||||
builder.setNonce(nonce);
|
builder.setNonce(nonce);
|
||||||
}
|
}
|
||||||
if (type == MutationType.INCREMENT) {
|
if (type == MutationType.INCREMENT) {
|
||||||
TimeRange timeRange = ((Increment) mutation).getTimeRange();
|
builder.setTimeRange(toTimeRange(((Increment) mutation).getTimeRange()));
|
||||||
setTimeRange(builder, timeRange);
|
|
||||||
}
|
}
|
||||||
if (type == MutationType.APPEND) {
|
if (type == MutationType.APPEND) {
|
||||||
TimeRange timeRange = ((Append) mutation).getTimeRange();
|
builder.setTimeRange(toTimeRange(((Append) mutation).getTimeRange()));
|
||||||
setTimeRange(builder, timeRange);
|
|
||||||
}
|
}
|
||||||
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
|
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
|
||||||
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
|
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
|
||||||
|
@ -1242,10 +1212,10 @@ public final class ProtobufUtil {
|
||||||
getMutationBuilderAndSetCommonFields(type, mutation, builder);
|
getMutationBuilderAndSetCommonFields(type, mutation, builder);
|
||||||
builder.setAssociatedCellCount(mutation.size());
|
builder.setAssociatedCellCount(mutation.size());
|
||||||
if (mutation instanceof Increment) {
|
if (mutation instanceof Increment) {
|
||||||
setTimeRange(builder, ((Increment)mutation).getTimeRange());
|
builder.setTimeRange(toTimeRange(((Increment)mutation).getTimeRange()));
|
||||||
}
|
}
|
||||||
if (mutation instanceof Append) {
|
if (mutation instanceof Append) {
|
||||||
setTimeRange(builder, ((Append)mutation).getTimeRange());
|
builder.setTimeRange(toTimeRange(((Append)mutation).getTimeRange()));
|
||||||
}
|
}
|
||||||
if (nonce != HConstants.NO_NONCE) {
|
if (nonce != HConstants.NO_NONCE) {
|
||||||
builder.setNonce(nonce);
|
builder.setNonce(nonce);
|
||||||
|
@ -1721,14 +1691,6 @@ public final class ProtobufUtil {
|
||||||
codedInput.checkLastTagWas(0);
|
codedInput.checkLastTagWas(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HBaseProtos.TimeRange.Builder timeRangeToProto(TimeRange timeRange) {
|
|
||||||
HBaseProtos.TimeRange.Builder timeRangeBuilder =
|
|
||||||
HBaseProtos.TimeRange.newBuilder();
|
|
||||||
timeRangeBuilder.setFrom(timeRange.getMin());
|
|
||||||
timeRangeBuilder.setTo(timeRange.getMax());
|
|
||||||
return timeRangeBuilder;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException {
|
private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException {
|
||||||
long minStamp = 0;
|
long minStamp = 0;
|
||||||
long maxStamp = Long.MAX_VALUE;
|
long maxStamp = Long.MAX_VALUE;
|
||||||
|
@ -1821,4 +1783,13 @@ public final class ProtobufUtil {
|
||||||
}
|
}
|
||||||
return RSGroupInfo;
|
return RSGroupInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static HBaseProtos.TimeRange toTimeRange(TimeRange timeRange) {
|
||||||
|
if (timeRange == null) {
|
||||||
|
timeRange = TimeRange.allTime();
|
||||||
|
}
|
||||||
|
return HBaseProtos.TimeRange.newBuilder().setFrom(timeRange.getMin())
|
||||||
|
.setTo(timeRange.getMax())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -532,13 +532,13 @@ public final class ProtobufUtil {
|
||||||
}
|
}
|
||||||
if (proto.getCfTimeRangeCount() > 0) {
|
if (proto.getCfTimeRangeCount() > 0) {
|
||||||
for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) {
|
for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) {
|
||||||
TimeRange timeRange = protoToTimeRange(cftr.getTimeRange());
|
TimeRange timeRange = toTimeRange(cftr.getTimeRange());
|
||||||
get.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(),
|
get.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(),
|
||||||
timeRange.getMin(), timeRange.getMax());
|
timeRange.getMin(), timeRange.getMax());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (proto.hasTimeRange()) {
|
if (proto.hasTimeRange()) {
|
||||||
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
|
TimeRange timeRange = toTimeRange(proto.getTimeRange());
|
||||||
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||||
}
|
}
|
||||||
if (proto.hasFilter()) {
|
if (proto.hasFilter()) {
|
||||||
|
@ -861,7 +861,7 @@ public final class ProtobufUtil {
|
||||||
Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
|
Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
|
||||||
Append::add, proto, cellScanner);
|
Append::add, proto, cellScanner);
|
||||||
if (proto.hasTimeRange()) {
|
if (proto.hasTimeRange()) {
|
||||||
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
|
TimeRange timeRange = toTimeRange(proto.getTimeRange());
|
||||||
append.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
append.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||||
}
|
}
|
||||||
return append;
|
return append;
|
||||||
|
@ -881,7 +881,7 @@ public final class ProtobufUtil {
|
||||||
Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()),
|
Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()),
|
||||||
Increment::add, proto, cellScanner);
|
Increment::add, proto, cellScanner);
|
||||||
if (proto.hasTimeRange()) {
|
if (proto.hasTimeRange()) {
|
||||||
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
|
TimeRange timeRange = toTimeRange(proto.getTimeRange());
|
||||||
increment.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
increment.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||||
}
|
}
|
||||||
return increment;
|
return increment;
|
||||||
|
@ -953,7 +953,7 @@ public final class ProtobufUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (proto.hasTimeRange()) {
|
if (proto.hasTimeRange()) {
|
||||||
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
|
TimeRange timeRange = toTimeRange(proto.getTimeRange());
|
||||||
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||||
}
|
}
|
||||||
for (NameBytesPair attribute : proto.getAttributeList()) {
|
for (NameBytesPair attribute : proto.getAttributeList()) {
|
||||||
|
@ -1017,20 +1017,13 @@ public final class ProtobufUtil {
|
||||||
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
|
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
|
||||||
}
|
}
|
||||||
scanBuilder.setMaxVersions(scan.getMaxVersions());
|
scanBuilder.setMaxVersions(scan.getMaxVersions());
|
||||||
for (Entry<byte[], TimeRange> cftr : scan.getColumnFamilyTimeRange().entrySet()) {
|
scan.getColumnFamilyTimeRange().forEach((cf, timeRange) -> {
|
||||||
HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder();
|
scanBuilder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder()
|
||||||
b.setColumnFamily(UnsafeByteOperations.unsafeWrap(cftr.getKey()));
|
.setColumnFamily(UnsafeByteOperations.unsafeWrap(cf))
|
||||||
b.setTimeRange(timeRangeToProto(cftr.getValue()));
|
.setTimeRange(toTimeRange(timeRange))
|
||||||
scanBuilder.addCfTimeRange(b);
|
.build());
|
||||||
}
|
});
|
||||||
TimeRange timeRange = scan.getTimeRange();
|
scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(scan.getTimeRange()));
|
||||||
if (!timeRange.isAllTime()) {
|
|
||||||
HBaseProtos.TimeRange.Builder timeRangeBuilder =
|
|
||||||
HBaseProtos.TimeRange.newBuilder();
|
|
||||||
timeRangeBuilder.setFrom(timeRange.getMin());
|
|
||||||
timeRangeBuilder.setTo(timeRange.getMax());
|
|
||||||
scanBuilder.setTimeRange(timeRangeBuilder.build());
|
|
||||||
}
|
|
||||||
Map<String, byte[]> attributes = scan.getAttributesMap();
|
Map<String, byte[]> attributes = scan.getAttributesMap();
|
||||||
if (!attributes.isEmpty()) {
|
if (!attributes.isEmpty()) {
|
||||||
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
|
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
|
||||||
|
@ -1149,13 +1142,13 @@ public final class ProtobufUtil {
|
||||||
}
|
}
|
||||||
if (proto.getCfTimeRangeCount() > 0) {
|
if (proto.getCfTimeRangeCount() > 0) {
|
||||||
for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) {
|
for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) {
|
||||||
TimeRange timeRange = protoToTimeRange(cftr.getTimeRange());
|
TimeRange timeRange = toTimeRange(cftr.getTimeRange());
|
||||||
scan.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(),
|
scan.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(),
|
||||||
timeRange.getMin(), timeRange.getMax());
|
timeRange.getMin(), timeRange.getMax());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (proto.hasTimeRange()) {
|
if (proto.hasTimeRange()) {
|
||||||
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
|
TimeRange timeRange = toTimeRange(proto.getTimeRange());
|
||||||
scan.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
scan.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||||
}
|
}
|
||||||
if (proto.hasFilter()) {
|
if (proto.hasFilter()) {
|
||||||
|
@ -1245,20 +1238,13 @@ public final class ProtobufUtil {
|
||||||
if (get.getFilter() != null) {
|
if (get.getFilter() != null) {
|
||||||
builder.setFilter(ProtobufUtil.toFilter(get.getFilter()));
|
builder.setFilter(ProtobufUtil.toFilter(get.getFilter()));
|
||||||
}
|
}
|
||||||
for (Entry<byte[], TimeRange> cftr : get.getColumnFamilyTimeRange().entrySet()) {
|
get.getColumnFamilyTimeRange().forEach((cf, timeRange) -> {
|
||||||
HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder();
|
builder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder()
|
||||||
b.setColumnFamily(UnsafeByteOperations.unsafeWrap(cftr.getKey()));
|
.setColumnFamily(UnsafeByteOperations.unsafeWrap(cf))
|
||||||
b.setTimeRange(timeRangeToProto(cftr.getValue()));
|
.setTimeRange(toTimeRange(timeRange))
|
||||||
builder.addCfTimeRange(b);
|
.build());
|
||||||
}
|
});
|
||||||
TimeRange timeRange = get.getTimeRange();
|
builder.setTimeRange(ProtobufUtil.toTimeRange(get.getTimeRange()));
|
||||||
if (!timeRange.isAllTime()) {
|
|
||||||
HBaseProtos.TimeRange.Builder timeRangeBuilder =
|
|
||||||
HBaseProtos.TimeRange.newBuilder();
|
|
||||||
timeRangeBuilder.setFrom(timeRange.getMin());
|
|
||||||
timeRangeBuilder.setTo(timeRange.getMax());
|
|
||||||
builder.setTimeRange(timeRangeBuilder.build());
|
|
||||||
}
|
|
||||||
Map<String, byte[]> attributes = get.getAttributesMap();
|
Map<String, byte[]> attributes = get.getAttributesMap();
|
||||||
if (!attributes.isEmpty()) {
|
if (!attributes.isEmpty()) {
|
||||||
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
|
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
|
||||||
|
@ -1303,16 +1289,6 @@ public final class ProtobufUtil {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setTimeRange(final MutationProto.Builder builder, final TimeRange timeRange) {
|
|
||||||
if (!timeRange.isAllTime()) {
|
|
||||||
HBaseProtos.TimeRange.Builder timeRangeBuilder =
|
|
||||||
HBaseProtos.TimeRange.newBuilder();
|
|
||||||
timeRangeBuilder.setFrom(timeRange.getMin());
|
|
||||||
timeRangeBuilder.setTo(timeRange.getMax());
|
|
||||||
builder.setTimeRange(timeRangeBuilder.build());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
|
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return toMutation(type, mutation, HConstants.NO_NONCE);
|
return toMutation(type, mutation, HConstants.NO_NONCE);
|
||||||
|
@ -1344,12 +1320,10 @@ public final class ProtobufUtil {
|
||||||
builder.setNonce(nonce);
|
builder.setNonce(nonce);
|
||||||
}
|
}
|
||||||
if (type == MutationType.INCREMENT) {
|
if (type == MutationType.INCREMENT) {
|
||||||
TimeRange timeRange = ((Increment) mutation).getTimeRange();
|
builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange()));
|
||||||
setTimeRange(builder, timeRange);
|
|
||||||
}
|
}
|
||||||
if (type == MutationType.APPEND) {
|
if (type == MutationType.APPEND) {
|
||||||
TimeRange timeRange = ((Append) mutation).getTimeRange();
|
builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange()));
|
||||||
setTimeRange(builder, timeRange);
|
|
||||||
}
|
}
|
||||||
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
|
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
|
||||||
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
|
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
|
||||||
|
@ -1407,10 +1381,10 @@ public final class ProtobufUtil {
|
||||||
getMutationBuilderAndSetCommonFields(type, mutation, builder);
|
getMutationBuilderAndSetCommonFields(type, mutation, builder);
|
||||||
builder.setAssociatedCellCount(mutation.size());
|
builder.setAssociatedCellCount(mutation.size());
|
||||||
if (mutation instanceof Increment) {
|
if (mutation instanceof Increment) {
|
||||||
setTimeRange(builder, ((Increment)mutation).getTimeRange());
|
builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange()));
|
||||||
}
|
}
|
||||||
if (mutation instanceof Append) {
|
if (mutation instanceof Append) {
|
||||||
setTimeRange(builder, ((Append)mutation).getTimeRange());
|
builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange()));
|
||||||
}
|
}
|
||||||
if (nonce != HConstants.NO_NONCE) {
|
if (nonce != HConstants.NO_NONCE) {
|
||||||
builder.setNonce(nonce);
|
builder.setNonce(nonce);
|
||||||
|
@ -2757,24 +2731,11 @@ public final class ProtobufUtil {
|
||||||
return scList;
|
return scList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HBaseProtos.TimeRange.Builder timeRangeToProto(TimeRange timeRange) {
|
public static TimeRange toTimeRange(HBaseProtos.TimeRange timeRange) {
|
||||||
HBaseProtos.TimeRange.Builder timeRangeBuilder =
|
return timeRange == null ?
|
||||||
HBaseProtos.TimeRange.newBuilder();
|
TimeRange.allTime() :
|
||||||
timeRangeBuilder.setFrom(timeRange.getMin());
|
new TimeRange(timeRange.hasFrom() ? timeRange.getFrom() : 0,
|
||||||
timeRangeBuilder.setTo(timeRange.getMax());
|
timeRange.hasTo() ? timeRange.getTo() : Long.MAX_VALUE);
|
||||||
return timeRangeBuilder;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException {
|
|
||||||
long minStamp = 0;
|
|
||||||
long maxStamp = Long.MAX_VALUE;
|
|
||||||
if (timeRange.hasFrom()) {
|
|
||||||
minStamp = timeRange.getFrom();
|
|
||||||
}
|
|
||||||
if (timeRange.hasTo()) {
|
|
||||||
maxStamp = timeRange.getTo();
|
|
||||||
}
|
|
||||||
return new TimeRange(minStamp, maxStamp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3229,4 +3190,13 @@ public final class ProtobufUtil {
|
||||||
.setTimeStampsOfLastAppliedOp(rls.getTimeStampsOfLastAppliedOp())
|
.setTimeStampsOfLastAppliedOp(rls.getTimeStampsOfLastAppliedOp())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static HBaseProtos.TimeRange toTimeRange(TimeRange timeRange) {
|
||||||
|
if (timeRange == null) {
|
||||||
|
timeRange = TimeRange.allTime();
|
||||||
|
}
|
||||||
|
return HBaseProtos.TimeRange.newBuilder().setFrom(timeRange.getMin())
|
||||||
|
.setTo(timeRange.getMax())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -235,16 +236,9 @@ public final class RequestConverter {
|
||||||
public static MutateRequest buildMutateRequest(
|
public static MutateRequest buildMutateRequest(
|
||||||
final byte[] regionName, final byte[] row, final byte[] family,
|
final byte[] regionName, final byte[] row, final byte[] family,
|
||||||
final byte [] qualifier, final ByteArrayComparable comparator,
|
final byte [] qualifier, final ByteArrayComparable comparator,
|
||||||
final CompareType compareType, final Put put) throws IOException {
|
final CompareType compareType, TimeRange timeRange, final Put put) throws IOException {
|
||||||
MutateRequest.Builder builder = MutateRequest.newBuilder();
|
return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
|
||||||
RegionSpecifier region = buildRegionSpecifier(
|
, put, MutationType.PUT);
|
||||||
RegionSpecifierType.REGION_NAME, regionName);
|
|
||||||
builder.setRegion(region);
|
|
||||||
Condition condition = buildCondition(
|
|
||||||
row, family, qualifier, comparator, compareType);
|
|
||||||
builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder()));
|
|
||||||
builder.setCondition(condition);
|
|
||||||
return builder.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -263,19 +257,21 @@ public final class RequestConverter {
|
||||||
public static MutateRequest buildMutateRequest(
|
public static MutateRequest buildMutateRequest(
|
||||||
final byte[] regionName, final byte[] row, final byte[] family,
|
final byte[] regionName, final byte[] row, final byte[] family,
|
||||||
final byte [] qualifier, final ByteArrayComparable comparator,
|
final byte [] qualifier, final ByteArrayComparable comparator,
|
||||||
final CompareType compareType, final Delete delete) throws IOException {
|
final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException {
|
||||||
MutateRequest.Builder builder = MutateRequest.newBuilder();
|
return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
|
||||||
RegionSpecifier region = buildRegionSpecifier(
|
, delete, MutationType.DELETE);
|
||||||
RegionSpecifierType.REGION_NAME, regionName);
|
|
||||||
builder.setRegion(region);
|
|
||||||
Condition condition = buildCondition(
|
|
||||||
row, family, qualifier, comparator, compareType);
|
|
||||||
builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete,
|
|
||||||
MutationProto.newBuilder()));
|
|
||||||
builder.setCondition(condition);
|
|
||||||
return builder.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
|
||||||
|
final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator,
|
||||||
|
final CompareType compareType, TimeRange timeRange, final Mutation mutation,
|
||||||
|
final MutationType type) throws IOException {
|
||||||
|
return MutateRequest.newBuilder()
|
||||||
|
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
|
||||||
|
.setMutation(ProtobufUtil.toMutation(type, mutation))
|
||||||
|
.setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Create a protocol buffer MutateRequest for conditioned row mutations
|
* Create a protocol buffer MutateRequest for conditioned row mutations
|
||||||
*
|
*
|
||||||
|
@ -289,17 +285,15 @@ public final class RequestConverter {
|
||||||
* @return a mutate request
|
* @return a mutate request
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static ClientProtos.MultiRequest buildMutateRequest(
|
public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
|
||||||
final byte[] regionName, final byte[] row, final byte[] family,
|
final byte[] row, final byte[] family, final byte[] qualifier,
|
||||||
final byte [] qualifier, final ByteArrayComparable comparator,
|
final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange,
|
||||||
final CompareType compareType, final RowMutations rowMutations) throws IOException {
|
final RowMutations rowMutations) throws IOException {
|
||||||
RegionAction.Builder builder =
|
RegionAction.Builder builder =
|
||||||
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
|
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
|
||||||
builder.setAtomic(true);
|
builder.setAtomic(true);
|
||||||
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
|
||||||
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
|
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
|
||||||
Condition condition = buildCondition(
|
|
||||||
row, family, qualifier, comparator, compareType);
|
|
||||||
for (Mutation mutation: rowMutations.getMutations()) {
|
for (Mutation mutation: rowMutations.getMutations()) {
|
||||||
MutationType mutateType = null;
|
MutationType mutateType = null;
|
||||||
if (mutation instanceof Put) {
|
if (mutation instanceof Put) {
|
||||||
|
@ -316,10 +310,9 @@ public final class RequestConverter {
|
||||||
actionBuilder.setMutation(mp);
|
actionBuilder.setMutation(mp);
|
||||||
builder.addAction(actionBuilder.build());
|
builder.addAction(actionBuilder.build());
|
||||||
}
|
}
|
||||||
ClientProtos.MultiRequest request =
|
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
|
||||||
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
|
.setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
|
||||||
.setCondition(condition).build();
|
.build();
|
||||||
return request;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1100,16 +1093,16 @@ public final class RequestConverter {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static Condition buildCondition(final byte[] row, final byte[] family,
|
public static Condition buildCondition(final byte[] row, final byte[] family,
|
||||||
final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType)
|
final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType,
|
||||||
throws IOException {
|
final TimeRange timeRange) {
|
||||||
Condition.Builder builder = Condition.newBuilder();
|
return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row))
|
||||||
builder.setRow(UnsafeByteOperations.unsafeWrap(row));
|
.setFamily(UnsafeByteOperations.unsafeWrap(family))
|
||||||
builder.setFamily(UnsafeByteOperations.unsafeWrap(family));
|
.setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ?
|
||||||
builder.setQualifier(UnsafeByteOperations
|
HConstants.EMPTY_BYTE_ARRAY : qualifier))
|
||||||
.unsafeWrap(qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier));
|
.setComparator(ProtobufUtil.toComparator(comparator))
|
||||||
builder.setComparator(ProtobufUtil.toComparator(comparator));
|
.setCompareType(compareType)
|
||||||
builder.setCompareType(compareType);
|
.setTimeRange(ProtobufUtil.toTimeRange(timeRange))
|
||||||
return builder.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
@ -110,7 +111,7 @@ public class TestProtobufUtil {
|
||||||
getBuilder = ClientProtos.Get.newBuilder(proto);
|
getBuilder = ClientProtos.Get.newBuilder(proto);
|
||||||
getBuilder.setMaxVersions(1);
|
getBuilder.setMaxVersions(1);
|
||||||
getBuilder.setCacheBlocks(true);
|
getBuilder.setCacheBlocks(true);
|
||||||
|
getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
|
||||||
Get get = ProtobufUtil.toGet(proto);
|
Get get = ProtobufUtil.toGet(proto);
|
||||||
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
|
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
|
||||||
}
|
}
|
||||||
|
@ -244,6 +245,7 @@ public class TestProtobufUtil {
|
||||||
scanBuilder.setMaxVersions(2);
|
scanBuilder.setMaxVersions(2);
|
||||||
scanBuilder.setCacheBlocks(false);
|
scanBuilder.setCacheBlocks(false);
|
||||||
scanBuilder.setCaching(1024);
|
scanBuilder.setCaching(1024);
|
||||||
|
scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
|
||||||
ClientProtos.Scan expectedProto = scanBuilder.build();
|
ClientProtos.Scan expectedProto = scanBuilder.build();
|
||||||
|
|
||||||
ClientProtos.Scan actualProto = ProtobufUtil.toScan(
|
ClientProtos.Scan actualProto = ProtobufUtil.toScan(
|
||||||
|
@ -305,6 +307,7 @@ public class TestProtobufUtil {
|
||||||
|
|
||||||
Increment increment = ProtobufUtil.toIncrement(proto, null);
|
Increment increment = ProtobufUtil.toIncrement(proto, null);
|
||||||
mutateBuilder.setTimestamp(increment.getTimeStamp());
|
mutateBuilder.setTimestamp(increment.getTimeStamp());
|
||||||
|
mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(increment.getTimeRange()));
|
||||||
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
|
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,6 +348,7 @@ public class TestProtobufUtil {
|
||||||
// append always use the latest timestamp,
|
// append always use the latest timestamp,
|
||||||
// reset the timestamp to the original mutate
|
// reset the timestamp to the original mutate
|
||||||
mutateBuilder.setTimestamp(append.getTimeStamp());
|
mutateBuilder.setTimestamp(append.getTimeStamp());
|
||||||
|
mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(append.getTimeRange()));
|
||||||
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append));
|
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,20 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
public class TimeRange {
|
public class TimeRange {
|
||||||
public static final long INITIAL_MIN_TIMESTAMP = 0L;
|
public static final long INITIAL_MIN_TIMESTAMP = 0L;
|
||||||
public static final long INITIAL_MAX_TIMESTAMP = Long.MAX_VALUE;
|
public static final long INITIAL_MAX_TIMESTAMP = Long.MAX_VALUE;
|
||||||
|
private static final TimeRange ALL_TIME = new TimeRange(INITIAL_MIN_TIMESTAMP,
|
||||||
|
INITIAL_MAX_TIMESTAMP);
|
||||||
|
|
||||||
|
public static TimeRange allTime() {
|
||||||
|
return ALL_TIME;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TimeRange at(long ts) {
|
||||||
|
if (ts < 0 || ts == Long.MAX_VALUE) {
|
||||||
|
throw new IllegalArgumentException("invalid ts:" + ts);
|
||||||
|
}
|
||||||
|
return new TimeRange(ts, ts + 1);
|
||||||
|
}
|
||||||
|
|
||||||
private final long minStamp;
|
private final long minStamp;
|
||||||
private final long maxStamp;
|
private final long maxStamp;
|
||||||
private final boolean allTime;
|
private final boolean allTime;
|
||||||
|
@ -150,7 +164,10 @@ public class TimeRange {
|
||||||
* @param bytes timestamp to check
|
* @param bytes timestamp to check
|
||||||
* @param offset offset into the bytes
|
* @param offset offset into the bytes
|
||||||
* @return true if within TimeRange, false if not
|
* @return true if within TimeRange, false if not
|
||||||
|
* @deprecated This is made @InterfaceAudience.Private in the 2.0 line and above and may be
|
||||||
|
* changed to private or removed in 3.0. Use {@link #withinTimeRange(long)} instead
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public boolean withinTimeRange(byte [] bytes, int offset) {
|
public boolean withinTimeRange(byte [] bytes, int offset) {
|
||||||
if (allTime) {
|
if (allTime) {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -143,6 +143,7 @@ message Condition {
|
||||||
required bytes qualifier = 3;
|
required bytes qualifier = 3;
|
||||||
required CompareType compare_type = 4;
|
required CompareType compare_type = 4;
|
||||||
required Comparator comparator = 5;
|
required Comparator comparator = 5;
|
||||||
|
optional TimeRange time_range = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -143,6 +143,7 @@ message Condition {
|
||||||
required bytes qualifier = 3;
|
required bytes qualifier = 3;
|
||||||
required CompareType compare_type = 4;
|
required CompareType compare_type = 4;
|
||||||
required Comparator comparator = 5;
|
required Comparator comparator = 5;
|
||||||
|
optional TimeRange time_range = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -975,6 +975,11 @@ public class RemoteHTable implements Table {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
|
||||||
|
throw new UnsupportedOperationException("timeRange not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CheckAndMutateBuilder ifNotExists() {
|
public CheckAndMutateBuilder ifNotExists() {
|
||||||
throw new UnsupportedOperationException("CheckAndMutate for non-equal comparison "
|
throw new UnsupportedOperationException("CheckAndMutate for non-equal comparison "
|
||||||
|
|
|
@ -4008,28 +4008,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
|
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||||
CompareOperator op, ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL)
|
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
|
||||||
throws IOException{
|
|
||||||
checkMutationType(mutation, row);
|
checkMutationType(mutation, row);
|
||||||
return doCheckAndRowMutate(row, family, qualifier, op, comparator, null,
|
return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation);
|
||||||
mutation);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
|
public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||||
CompareOperator op, ByteArrayComparable comparator, RowMutations rm)
|
ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
|
||||||
throws IOException {
|
return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null);
|
||||||
return doCheckAndRowMutate(row, family, qualifier, op, comparator, rm, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
|
* checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
|
||||||
* switches in the few places where there is deviation.
|
* switches in the few places where there is deviation.
|
||||||
*/
|
*/
|
||||||
private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
|
private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
|
||||||
CompareOperator op, ByteArrayComparable comparator, RowMutations rowMutations,
|
CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange,
|
||||||
Mutation mutation)
|
RowMutations rowMutations, Mutation mutation)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Could do the below checks but seems wacky with two callers only. Just comment out for now.
|
// Could do the below checks but seems wacky with two callers only. Just comment out for now.
|
||||||
// One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
|
// One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
|
||||||
|
@ -4044,6 +4041,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
Get get = new Get(row);
|
Get get = new Get(row);
|
||||||
checkFamily(family);
|
checkFamily(family);
|
||||||
get.addColumn(family, qualifier);
|
get.addColumn(family, qualifier);
|
||||||
|
if (timeRange != null) {
|
||||||
|
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||||
|
}
|
||||||
// Lock row - note that doBatchMutate will relock this row if called
|
// Lock row - note that doBatchMutate will relock this row if called
|
||||||
checkRow(row, "doCheckAndRowMutate");
|
checkRow(row, "doCheckAndRowMutate");
|
||||||
RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
|
RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
|
||||||
|
|
|
@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
|
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||||
|
@ -593,9 +594,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
* @param cellScanner if non-null, the mutation data -- the Cell content.
|
||||||
*/
|
*/
|
||||||
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
|
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
|
||||||
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
|
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||||
CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder,
|
ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder,
|
||||||
ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
|
ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
|
||||||
int countOfCompleteMutation = 0;
|
int countOfCompleteMutation = 0;
|
||||||
try {
|
try {
|
||||||
if (!region.getRegionInfo().isMetaRegion()) {
|
if (!region.getRegionInfo().isMetaRegion()) {
|
||||||
|
@ -638,7 +639,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
builder.addResultOrException(
|
builder.addResultOrException(
|
||||||
resultOrExceptionOrBuilder.build());
|
resultOrExceptionOrBuilder.build());
|
||||||
}
|
}
|
||||||
return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm);
|
return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
|
||||||
} finally {
|
} finally {
|
||||||
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
|
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
|
||||||
// even if the malformed cells are not skipped.
|
// even if the malformed cells are not skipped.
|
||||||
|
@ -2640,9 +2641,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
CompareOperator.valueOf(condition.getCompareType().name());
|
CompareOperator.valueOf(condition.getCompareType().name());
|
||||||
ByteArrayComparable comparator =
|
ByteArrayComparable comparator =
|
||||||
ProtobufUtil.toComparator(condition.getComparator());
|
ProtobufUtil.toComparator(condition.getComparator());
|
||||||
processed = checkAndRowMutate(region, regionAction.getActionList(),
|
TimeRange timeRange = condition.hasTimeRange() ?
|
||||||
cellScanner, row, family, qualifier, op,
|
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||||
comparator, regionActionResultBuilder, spaceQuotaEnforcement);
|
TimeRange.allTime();
|
||||||
|
processed =
|
||||||
|
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
|
||||||
|
qualifier, op, comparator, timeRange, regionActionResultBuilder,
|
||||||
|
spaceQuotaEnforcement);
|
||||||
} else {
|
} else {
|
||||||
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
|
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
|
||||||
cellScanner, spaceQuotaEnforcement);
|
cellScanner, spaceQuotaEnforcement);
|
||||||
|
@ -2763,79 +2768,84 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
|
spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
|
||||||
|
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case APPEND:
|
case APPEND:
|
||||||
// TODO: this doesn't actually check anything.
|
// TODO: this doesn't actually check anything.
|
||||||
r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
|
r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
|
||||||
break;
|
break;
|
||||||
case INCREMENT:
|
case INCREMENT:
|
||||||
// TODO: this doesn't actually check anything.
|
// TODO: this doesn't actually check anything.
|
||||||
r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
|
r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
|
||||||
break;
|
break;
|
||||||
case PUT:
|
case PUT:
|
||||||
Put put = ProtobufUtil.toPut(mutation, cellScanner);
|
Put put = ProtobufUtil.toPut(mutation, cellScanner);
|
||||||
checkCellSizeLimit(region, put);
|
checkCellSizeLimit(region, put);
|
||||||
// Throws an exception when violated
|
// Throws an exception when violated
|
||||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
|
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
|
||||||
quota.addMutation(put);
|
quota.addMutation(put);
|
||||||
if (request.hasCondition()) {
|
if (request.hasCondition()) {
|
||||||
Condition condition = request.getCondition();
|
Condition condition = request.getCondition();
|
||||||
byte[] row = condition.getRow().toByteArray();
|
byte[] row = condition.getRow().toByteArray();
|
||||||
byte[] family = condition.getFamily().toByteArray();
|
byte[] family = condition.getFamily().toByteArray();
|
||||||
byte[] qualifier = condition.getQualifier().toByteArray();
|
byte[] qualifier = condition.getQualifier().toByteArray();
|
||||||
CompareOperator compareOp =
|
CompareOperator compareOp =
|
||||||
CompareOperator.valueOf(condition.getCompareType().name());
|
CompareOperator.valueOf(condition.getCompareType().name());
|
||||||
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
|
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
|
||||||
if (region.getCoprocessorHost() != null) {
|
TimeRange timeRange = condition.hasTimeRange() ?
|
||||||
processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
|
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||||
compareOp, comparator, put);
|
TimeRange.allTime();
|
||||||
}
|
|
||||||
if (processed == null) {
|
|
||||||
boolean result = region.checkAndMutate(row, family,
|
|
||||||
qualifier, compareOp, comparator, put, true);
|
|
||||||
if (region.getCoprocessorHost() != null) {
|
if (region.getCoprocessorHost() != null) {
|
||||||
result = region.getCoprocessorHost().postCheckAndPut(row, family,
|
processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
|
||||||
qualifier, compareOp, comparator, put, result);
|
compareOp, comparator, put);
|
||||||
}
|
}
|
||||||
processed = result;
|
if (processed == null) {
|
||||||
|
boolean result = region.checkAndMutate(row, family,
|
||||||
|
qualifier, compareOp, comparator, timeRange, put);
|
||||||
|
if (region.getCoprocessorHost() != null) {
|
||||||
|
result = region.getCoprocessorHost().postCheckAndPut(row, family,
|
||||||
|
qualifier, compareOp, comparator, put, result);
|
||||||
|
}
|
||||||
|
processed = result;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
region.put(put);
|
||||||
|
processed = Boolean.TRUE;
|
||||||
}
|
}
|
||||||
} else {
|
break;
|
||||||
region.put(put);
|
case DELETE:
|
||||||
processed = Boolean.TRUE;
|
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
|
||||||
}
|
checkCellSizeLimit(region, delete);
|
||||||
break;
|
spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
|
||||||
case DELETE:
|
quota.addMutation(delete);
|
||||||
Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
|
if (request.hasCondition()) {
|
||||||
checkCellSizeLimit(region, delete);
|
Condition condition = request.getCondition();
|
||||||
spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
|
byte[] row = condition.getRow().toByteArray();
|
||||||
quota.addMutation(delete);
|
byte[] family = condition.getFamily().toByteArray();
|
||||||
if (request.hasCondition()) {
|
byte[] qualifier = condition.getQualifier().toByteArray();
|
||||||
Condition condition = request.getCondition();
|
CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
|
||||||
byte[] row = condition.getRow().toByteArray();
|
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
|
||||||
byte[] family = condition.getFamily().toByteArray();
|
TimeRange timeRange = condition.hasTimeRange() ?
|
||||||
byte[] qualifier = condition.getQualifier().toByteArray();
|
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
|
||||||
CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
|
TimeRange.allTime();
|
||||||
ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
|
|
||||||
if (region.getCoprocessorHost() != null) {
|
|
||||||
processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
|
|
||||||
comparator, delete);
|
|
||||||
}
|
|
||||||
if (processed == null) {
|
|
||||||
boolean result = region.checkAndMutate(row, family,
|
|
||||||
qualifier, op, comparator, delete, true);
|
|
||||||
if (region.getCoprocessorHost() != null) {
|
if (region.getCoprocessorHost() != null) {
|
||||||
result = region.getCoprocessorHost().postCheckAndDelete(row, family,
|
processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
|
||||||
qualifier, op, comparator, delete, result);
|
comparator, delete);
|
||||||
}
|
}
|
||||||
processed = result;
|
if (processed == null) {
|
||||||
|
boolean result = region.checkAndMutate(row, family,
|
||||||
|
qualifier, op, comparator, timeRange, delete);
|
||||||
|
if (region.getCoprocessorHost() != null) {
|
||||||
|
result = region.getCoprocessorHost().postCheckAndDelete(row, family,
|
||||||
|
qualifier, op, comparator, delete, result);
|
||||||
|
}
|
||||||
|
processed = result;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
region.delete(delete);
|
||||||
|
processed = Boolean.TRUE;
|
||||||
}
|
}
|
||||||
} else {
|
break;
|
||||||
region.delete(delete);
|
default:
|
||||||
processed = Boolean.TRUE;
|
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new DoNotRetryIOException(
|
|
||||||
"Unsupported mutate type: " + type.name());
|
|
||||||
}
|
}
|
||||||
if (processed != null) {
|
if (processed != null) {
|
||||||
builder.setProcessed(processed.booleanValue());
|
builder.setProcessed(processed.booleanValue());
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
@ -303,14 +304,31 @@ public interface Region extends ConfigurationObserver {
|
||||||
* @param family column family to check
|
* @param family column family to check
|
||||||
* @param qualifier column qualifier to check
|
* @param qualifier column qualifier to check
|
||||||
* @param op the comparison operator
|
* @param op the comparison operator
|
||||||
* @param comparator
|
* @param comparator the expected value
|
||||||
* @param mutation
|
* @param mutation data to put if check succeeds
|
||||||
* @param writeToWAL
|
* @return true if mutation was applied, false otherwise
|
||||||
|
*/
|
||||||
|
default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
||||||
|
ByteArrayComparable comparator, Mutation mutation) throws IOException {
|
||||||
|
return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomically checks if a row/family/qualifier value matches the expected value and if it does,
|
||||||
|
* it performs the mutation. If the passed value is null, the lack of column value
|
||||||
|
* (ie: non-existence) is used. See checkAndRowMutate to do many checkAndPuts at a time on a
|
||||||
|
* single row.
|
||||||
|
* @param row to check
|
||||||
|
* @param family column family to check
|
||||||
|
* @param qualifier column qualifier to check
|
||||||
|
* @param op the comparison operator
|
||||||
|
* @param comparator the expected value
|
||||||
|
* @param mutation data to put if check succeeds
|
||||||
|
* @param timeRange time range to check
|
||||||
* @return true if mutation was applied, false otherwise
|
* @return true if mutation was applied, false otherwise
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
||||||
ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException;
|
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Atomically checks if a row/family/qualifier value matches the expected values and if it does,
|
* Atomically checks if a row/family/qualifier value matches the expected values and if it does,
|
||||||
|
@ -321,13 +339,32 @@ public interface Region extends ConfigurationObserver {
|
||||||
* @param family column family to check
|
* @param family column family to check
|
||||||
* @param qualifier column qualifier to check
|
* @param qualifier column qualifier to check
|
||||||
* @param op the comparison operator
|
* @param op the comparison operator
|
||||||
* @param comparator
|
* @param comparator the expected value
|
||||||
* @param mutations
|
* @param mutations data to put if check succeeds
|
||||||
|
* @return true if mutations were applied, false otherwise
|
||||||
|
*/
|
||||||
|
default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
|
||||||
|
ByteArrayComparable comparator, RowMutations mutations) throws IOException {
|
||||||
|
return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(),
|
||||||
|
mutations);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomically checks if a row/family/qualifier value matches the expected values and if it does,
|
||||||
|
* it performs the row mutations. If the passed value is null, the lack of column value
|
||||||
|
* (ie: non-existence) is used. Use to do many mutations on a single row. Use checkAndMutate
|
||||||
|
* to do one checkAndMutate at a time.
|
||||||
|
* @param row to check
|
||||||
|
* @param family column family to check
|
||||||
|
* @param qualifier column qualifier to check
|
||||||
|
* @param op the comparison operator
|
||||||
|
* @param comparator the expected value
|
||||||
|
* @param mutations data to put if check succeeds
|
||||||
|
* @param timeRange time range to check
|
||||||
* @return true if mutations were applied, false otherwise
|
* @return true if mutations were applied, false otherwise
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
|
||||||
ByteArrayComparable comparator, RowMutations mutations)
|
ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -338,4 +339,66 @@ public class TestAsyncTable {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAndMutateWithTimeRange() throws Exception {
|
||||||
|
TEST_UTIL.createTable(TableName.valueOf("testCheckAndMutateWithTimeRange"), FAMILY);
|
||||||
|
AsyncTable<?> table = getTable.get();
|
||||||
|
final long ts = System.currentTimeMillis() / 2;
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
|
||||||
|
|
||||||
|
boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.ifNotExists()
|
||||||
|
.thenPut(put)
|
||||||
|
.get();
|
||||||
|
assertTrue(ok);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts + 10000))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenPut(put)
|
||||||
|
.get();
|
||||||
|
assertFalse(ok);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenPut(put)
|
||||||
|
.get();
|
||||||
|
assertTrue(ok);
|
||||||
|
|
||||||
|
RowMutations rm = new RowMutations(row)
|
||||||
|
.add((Mutation) put);
|
||||||
|
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts + 10000))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenMutate(rm)
|
||||||
|
.get();
|
||||||
|
assertFalse(ok);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenMutate(rm)
|
||||||
|
.get();
|
||||||
|
assertTrue(ok);
|
||||||
|
|
||||||
|
Delete delete = new Delete(row)
|
||||||
|
.addColumn(FAMILY, QUALIFIER);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts + 10000))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenDelete(delete)
|
||||||
|
.get();
|
||||||
|
assertFalse(ok);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenDelete(delete)
|
||||||
|
.get();
|
||||||
|
assertTrue(ok);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||||
import org.apache.hadoop.hbase.filter.SubstringComparator;
|
import org.apache.hadoop.hbase.filter.SubstringComparator;
|
||||||
import org.apache.hadoop.hbase.filter.ValueFilter;
|
import org.apache.hadoop.hbase.filter.ValueFilter;
|
||||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||||
|
@ -4831,6 +4832,60 @@ public class TestFromClientSide {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAndMutateWithTimeRange() throws IOException {
|
||||||
|
Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY);
|
||||||
|
final long ts = System.currentTimeMillis() / 2;
|
||||||
|
Put put = new Put(ROW);
|
||||||
|
put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
|
||||||
|
|
||||||
|
boolean ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.ifNotExists()
|
||||||
|
.thenPut(put);
|
||||||
|
assertTrue(ok);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts + 10000))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenPut(put);
|
||||||
|
assertFalse(ok);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenPut(put);
|
||||||
|
assertTrue(ok);
|
||||||
|
|
||||||
|
RowMutations rm = new RowMutations(ROW)
|
||||||
|
.add((Mutation) put);
|
||||||
|
ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts + 10000))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenMutate(rm);
|
||||||
|
assertFalse(ok);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenMutate(rm);
|
||||||
|
assertTrue(ok);
|
||||||
|
|
||||||
|
Delete delete = new Delete(ROW)
|
||||||
|
.addColumn(FAMILY, QUALIFIER);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts + 10000))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenDelete(delete);
|
||||||
|
assertFalse(ok);
|
||||||
|
|
||||||
|
ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
|
||||||
|
.timeRange(TimeRange.at(ts))
|
||||||
|
.ifEquals(VALUE)
|
||||||
|
.thenDelete(delete);
|
||||||
|
assertTrue(ok);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckAndPutWithCompareOp() throws IOException {
|
public void testCheckAndPutWithCompareOp() throws IOException {
|
||||||
final byte [] value1 = Bytes.toBytes("aaaa");
|
final byte [] value1 = Bytes.toBytes("aaaa");
|
||||||
|
|
|
@ -239,7 +239,7 @@ public class TestMalformedCellFromClient {
|
||||||
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
|
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
|
||||||
ClientProtos.Condition condition = RequestConverter
|
ClientProtos.Condition condition = RequestConverter
|
||||||
.buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
|
.buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
|
||||||
HBaseProtos.CompareType.EQUAL);
|
HBaseProtos.CompareType.EQUAL, null);
|
||||||
for (Mutation mutation : rm.getMutations()) {
|
for (Mutation mutation : rm.getMutations()) {
|
||||||
ClientProtos.MutationProto.MutationType mutateType = null;
|
ClientProtos.MutationProto.MutationType mutateType = null;
|
||||||
if (mutation instanceof Put) {
|
if (mutation instanceof Put) {
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
|
||||||
|
@ -104,6 +105,7 @@ public class TestProtobufUtil {
|
||||||
getBuilder = ClientProtos.Get.newBuilder(proto);
|
getBuilder = ClientProtos.Get.newBuilder(proto);
|
||||||
getBuilder.setMaxVersions(1);
|
getBuilder.setMaxVersions(1);
|
||||||
getBuilder.setCacheBlocks(true);
|
getBuilder.setCacheBlocks(true);
|
||||||
|
getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
|
||||||
|
|
||||||
Get get = ProtobufUtil.toGet(proto);
|
Get get = ProtobufUtil.toGet(proto);
|
||||||
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
|
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
|
||||||
|
@ -146,6 +148,7 @@ public class TestProtobufUtil {
|
||||||
// append always use the latest timestamp,
|
// append always use the latest timestamp,
|
||||||
// reset the timestamp to the original mutate
|
// reset the timestamp to the original mutate
|
||||||
mutateBuilder.setTimestamp(append.getTimeStamp());
|
mutateBuilder.setTimestamp(append.getTimeStamp());
|
||||||
|
mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(append.getTimeRange()));
|
||||||
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append));
|
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,6 +232,7 @@ public class TestProtobufUtil {
|
||||||
|
|
||||||
Increment increment = ProtobufUtil.toIncrement(proto, null);
|
Increment increment = ProtobufUtil.toIncrement(proto, null);
|
||||||
mutateBuilder.setTimestamp(increment.getTimeStamp());
|
mutateBuilder.setTimestamp(increment.getTimeStamp());
|
||||||
|
mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(increment.getTimeRange()));
|
||||||
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
|
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -314,6 +318,7 @@ public class TestProtobufUtil {
|
||||||
scanBuilder.setMaxVersions(2);
|
scanBuilder.setMaxVersions(2);
|
||||||
scanBuilder.setCacheBlocks(false);
|
scanBuilder.setCacheBlocks(false);
|
||||||
scanBuilder.setCaching(1024);
|
scanBuilder.setCaching(1024);
|
||||||
|
scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
|
||||||
ClientProtos.Scan expectedProto = scanBuilder.build();
|
ClientProtos.Scan expectedProto = scanBuilder.build();
|
||||||
|
|
||||||
ClientProtos.Scan actualProto = ProtobufUtil.toScan(
|
ClientProtos.Scan actualProto = ProtobufUtil.toScan(
|
||||||
|
|
|
@ -663,7 +663,7 @@ public class TestAtomicOperation {
|
||||||
}
|
}
|
||||||
testStep = TestStep.CHECKANDPUT_STARTED;
|
testStep = TestStep.CHECKANDPUT_STARTED;
|
||||||
region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
|
region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
|
||||||
CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
|
CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
|
||||||
testStep = TestStep.CHECKANDPUT_COMPLETED;
|
testStep = TestStep.CHECKANDPUT_COMPLETED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1738,7 +1738,7 @@ public class TestHRegion {
|
||||||
|
|
||||||
// checkAndPut with empty value
|
// checkAndPut with empty value
|
||||||
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
||||||
emptyVal), put, true);
|
emptyVal), put);
|
||||||
assertTrue(res);
|
assertTrue(res);
|
||||||
|
|
||||||
// Putting data in key
|
// Putting data in key
|
||||||
|
@ -1747,25 +1747,25 @@ public class TestHRegion {
|
||||||
|
|
||||||
// checkAndPut with correct value
|
// checkAndPut with correct value
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
||||||
put, true);
|
put);
|
||||||
assertTrue(res);
|
assertTrue(res);
|
||||||
|
|
||||||
// not empty anymore
|
// not empty anymore
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
||||||
put, true);
|
put);
|
||||||
assertFalse(res);
|
assertFalse(res);
|
||||||
|
|
||||||
Delete delete = new Delete(row1);
|
Delete delete = new Delete(row1);
|
||||||
delete.addColumn(fam1, qf1);
|
delete.addColumn(fam1, qf1);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
||||||
delete, true);
|
delete);
|
||||||
assertFalse(res);
|
assertFalse(res);
|
||||||
|
|
||||||
put = new Put(row1);
|
put = new Put(row1);
|
||||||
put.addColumn(fam1, qf1, val2);
|
put.addColumn(fam1, qf1, val2);
|
||||||
// checkAndPut with correct value
|
// checkAndPut with correct value
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
|
||||||
put, true);
|
put);
|
||||||
assertTrue(res);
|
assertTrue(res);
|
||||||
|
|
||||||
// checkAndDelete with correct value
|
// checkAndDelete with correct value
|
||||||
|
@ -1773,12 +1773,12 @@ public class TestHRegion {
|
||||||
delete.addColumn(fam1, qf1);
|
delete.addColumn(fam1, qf1);
|
||||||
delete.addColumn(fam1, qf1);
|
delete.addColumn(fam1, qf1);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
|
||||||
delete, true);
|
delete);
|
||||||
assertTrue(res);
|
assertTrue(res);
|
||||||
|
|
||||||
delete = new Delete(row1);
|
delete = new Delete(row1);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
||||||
delete, true);
|
delete);
|
||||||
assertTrue(res);
|
assertTrue(res);
|
||||||
|
|
||||||
// checkAndPut looking for a null value
|
// checkAndPut looking for a null value
|
||||||
|
@ -1786,7 +1786,7 @@ public class TestHRegion {
|
||||||
put.addColumn(fam1, qf1, val1);
|
put.addColumn(fam1, qf1, val1);
|
||||||
|
|
||||||
res = region
|
res = region
|
||||||
.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put, true);
|
.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
|
||||||
assertTrue(res);
|
assertTrue(res);
|
||||||
} finally {
|
} finally {
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
|
@ -1814,14 +1814,14 @@ public class TestHRegion {
|
||||||
|
|
||||||
// checkAndPut with wrong value
|
// checkAndPut with wrong value
|
||||||
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
||||||
val2), put, true);
|
val2), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// checkAndDelete with wrong value
|
// checkAndDelete with wrong value
|
||||||
Delete delete = new Delete(row1);
|
Delete delete = new Delete(row1);
|
||||||
delete.addFamily(fam1);
|
delete.addFamily(fam1);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
|
||||||
put, true);
|
put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// Putting data in key
|
// Putting data in key
|
||||||
|
@ -1832,7 +1832,7 @@ public class TestHRegion {
|
||||||
// checkAndPut with wrong value
|
// checkAndPut with wrong value
|
||||||
res =
|
res =
|
||||||
region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
|
region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
|
||||||
bd2), put, true);
|
bd2), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// checkAndDelete with wrong value
|
// checkAndDelete with wrong value
|
||||||
|
@ -1840,7 +1840,7 @@ public class TestHRegion {
|
||||||
delete.addFamily(fam1);
|
delete.addFamily(fam1);
|
||||||
res =
|
res =
|
||||||
region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
|
region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
|
||||||
bd2), put, true);
|
bd2), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
} finally {
|
} finally {
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
|
@ -1866,14 +1866,14 @@ public class TestHRegion {
|
||||||
|
|
||||||
// checkAndPut with correct value
|
// checkAndPut with correct value
|
||||||
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
||||||
val1), put, true);
|
val1), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
// checkAndDelete with correct value
|
// checkAndDelete with correct value
|
||||||
Delete delete = new Delete(row1);
|
Delete delete = new Delete(row1);
|
||||||
delete.addColumn(fam1, qf1);
|
delete.addColumn(fam1, qf1);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
|
||||||
delete, true);
|
delete);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
// Putting data in key
|
// Putting data in key
|
||||||
|
@ -1884,7 +1884,7 @@ public class TestHRegion {
|
||||||
// checkAndPut with correct value
|
// checkAndPut with correct value
|
||||||
res =
|
res =
|
||||||
region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
|
region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
|
||||||
bd1), put, true);
|
bd1), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
// checkAndDelete with correct value
|
// checkAndDelete with correct value
|
||||||
|
@ -1892,7 +1892,7 @@ public class TestHRegion {
|
||||||
delete.addColumn(fam1, qf1);
|
delete.addColumn(fam1, qf1);
|
||||||
res =
|
res =
|
||||||
region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
|
region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
|
||||||
bd1), delete, true);
|
bd1), delete);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
} finally {
|
} finally {
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
|
@ -1920,12 +1920,12 @@ public class TestHRegion {
|
||||||
|
|
||||||
// Test CompareOp.LESS: original = val3, compare with val3, fail
|
// Test CompareOp.LESS: original = val3, compare with val3, fail
|
||||||
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
|
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
|
||||||
new BinaryComparator(val3), put, true);
|
new BinaryComparator(val3), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// Test CompareOp.LESS: original = val3, compare with val4, fail
|
// Test CompareOp.LESS: original = val3, compare with val4, fail
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
|
||||||
new BinaryComparator(val4), put, true);
|
new BinaryComparator(val4), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// Test CompareOp.LESS: original = val3, compare with val2,
|
// Test CompareOp.LESS: original = val3, compare with val2,
|
||||||
|
@ -1933,18 +1933,18 @@ public class TestHRegion {
|
||||||
put = new Put(row1);
|
put = new Put(row1);
|
||||||
put.addColumn(fam1, qf1, val2);
|
put.addColumn(fam1, qf1, val2);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
|
||||||
new BinaryComparator(val2), put, true);
|
new BinaryComparator(val2), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
|
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
|
||||||
new BinaryComparator(val3), put, true);
|
new BinaryComparator(val3), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
|
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
|
||||||
// succeed (value still = val2)
|
// succeed (value still = val2)
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
|
||||||
new BinaryComparator(val2), put, true);
|
new BinaryComparator(val2), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
|
// Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
|
||||||
|
@ -1952,17 +1952,17 @@ public class TestHRegion {
|
||||||
put = new Put(row1);
|
put = new Put(row1);
|
||||||
put.addColumn(fam1, qf1, val3);
|
put.addColumn(fam1, qf1, val3);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
|
||||||
new BinaryComparator(val1), put, true);
|
new BinaryComparator(val1), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
// Test CompareOp.GREATER: original = val3, compare with val3, fail
|
// Test CompareOp.GREATER: original = val3, compare with val3, fail
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
|
||||||
new BinaryComparator(val3), put, true);
|
new BinaryComparator(val3), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// Test CompareOp.GREATER: original = val3, compare with val2, fail
|
// Test CompareOp.GREATER: original = val3, compare with val2, fail
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
|
||||||
new BinaryComparator(val2), put, true);
|
new BinaryComparator(val2), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// Test CompareOp.GREATER: original = val3, compare with val4,
|
// Test CompareOp.GREATER: original = val3, compare with val4,
|
||||||
|
@ -1970,23 +1970,23 @@ public class TestHRegion {
|
||||||
put = new Put(row1);
|
put = new Put(row1);
|
||||||
put.addColumn(fam1, qf1, val2);
|
put.addColumn(fam1, qf1, val2);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
|
||||||
new BinaryComparator(val4), put, true);
|
new BinaryComparator(val4), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
|
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
|
||||||
new BinaryComparator(val1), put, true);
|
new BinaryComparator(val1), put);
|
||||||
assertEquals(false, res);
|
assertEquals(false, res);
|
||||||
|
|
||||||
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
|
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
|
||||||
// succeed (value still = val2)
|
// succeed (value still = val2)
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
|
||||||
new BinaryComparator(val2), put, true);
|
new BinaryComparator(val2), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
|
// Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
|
||||||
new BinaryComparator(val3), put, true);
|
new BinaryComparator(val3), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
} finally {
|
} finally {
|
||||||
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
|
@ -2021,7 +2021,7 @@ public class TestHRegion {
|
||||||
|
|
||||||
// checkAndPut with wrong value
|
// checkAndPut with wrong value
|
||||||
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
||||||
val1), put, true);
|
val1), put);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
Get get = new Get(row1);
|
Get get = new Get(row1);
|
||||||
|
@ -2048,7 +2048,7 @@ public class TestHRegion {
|
||||||
put.addColumn(fam1, qual1, value1);
|
put.addColumn(fam1, qual1, value1);
|
||||||
try {
|
try {
|
||||||
region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
|
region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
|
||||||
new BinaryComparator(value2), put, false);
|
new BinaryComparator(value2), put);
|
||||||
fail();
|
fail();
|
||||||
} catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
|
} catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
|
||||||
// expected exception.
|
// expected exception.
|
||||||
|
@ -2097,7 +2097,7 @@ public class TestHRegion {
|
||||||
delete.addColumn(fam2, qf1);
|
delete.addColumn(fam2, qf1);
|
||||||
delete.addColumn(fam1, qf3);
|
delete.addColumn(fam1, qf3);
|
||||||
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
|
||||||
val2), delete, true);
|
val2), delete);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
Get get = new Get(row1);
|
Get get = new Get(row1);
|
||||||
|
@ -2113,7 +2113,7 @@ public class TestHRegion {
|
||||||
delete = new Delete(row1);
|
delete = new Delete(row1);
|
||||||
delete.addFamily(fam2);
|
delete.addFamily(fam2);
|
||||||
res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
|
||||||
delete, true);
|
delete);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
|
|
||||||
get = new Get(row1);
|
get = new Get(row1);
|
||||||
|
@ -2124,7 +2124,7 @@ public class TestHRegion {
|
||||||
// Row delete
|
// Row delete
|
||||||
delete = new Delete(row1);
|
delete = new Delete(row1);
|
||||||
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
|
res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
|
||||||
delete, true);
|
delete);
|
||||||
assertEquals(true, res);
|
assertEquals(true, res);
|
||||||
get = new Get(row1);
|
get = new Get(row1);
|
||||||
r = region.get(get);
|
r = region.get(get);
|
||||||
|
@ -6260,7 +6260,7 @@ public class TestHRegion {
|
||||||
p = new Put(row);
|
p = new Put(row);
|
||||||
p.setDurability(Durability.SKIP_WAL);
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
p.addColumn(fam1, qual1, qual2);
|
p.addColumn(fam1, qual1, qual2);
|
||||||
region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p, false);
|
region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
|
||||||
result = region.get(new Get(row));
|
result = region.get(new Get(row));
|
||||||
c = result.getColumnLatestCell(fam1, qual1);
|
c = result.getColumnLatestCell(fam1, qual1);
|
||||||
assertEquals(10L, c.getTimestamp());
|
assertEquals(10L, c.getTimestamp());
|
||||||
|
|
|
@ -50,13 +50,13 @@ public class TestSimpleTimeRangeTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExtreme() {
|
public void testExtreme() {
|
||||||
TimeRange tr = new TimeRange();
|
TimeRange tr = TimeRange.allTime();
|
||||||
assertTrue(tr.includesTimeRange(new TimeRange()));
|
assertTrue(tr.includesTimeRange(TimeRange.allTime()));
|
||||||
TimeRangeTracker trt = getTimeRangeTracker();
|
TimeRangeTracker trt = getTimeRangeTracker();
|
||||||
assertFalse(trt.includesTimeRange(new TimeRange()));
|
assertFalse(trt.includesTimeRange(TimeRange.allTime()));
|
||||||
trt.includeTimestamp(1);
|
trt.includeTimestamp(1);
|
||||||
trt.includeTimestamp(10);
|
trt.includeTimestamp(10);
|
||||||
assertTrue(trt.includesTimeRange(new TimeRange()));
|
assertTrue(trt.includesTimeRange(TimeRange.allTime()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -114,7 +114,7 @@ public class TestSimpleTimeRangeTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRangeConstruction() throws IOException {
|
public void testRangeConstruction() throws IOException {
|
||||||
TimeRange defaultRange = new TimeRange();
|
TimeRange defaultRange = TimeRange.allTime();
|
||||||
assertEquals(0L, defaultRange.getMin());
|
assertEquals(0L, defaultRange.getMin());
|
||||||
assertEquals(Long.MAX_VALUE, defaultRange.getMax());
|
assertEquals(Long.MAX_VALUE, defaultRange.getMax());
|
||||||
assertTrue(defaultRange.isAllTime());
|
assertTrue(defaultRange.isAllTime());
|
||||||
|
|
Loading…
Reference in New Issue