HBASE-15475 Allow TimestampsFilter to provide a seek hint
This commit is contained in:
parent
a3d550fbca
commit
604415e827
|
@ -19,16 +19,18 @@ package org.apache.hadoop.hbase.filter;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
|
||||
|
||||
/**
|
||||
* Filter that returns only cells whose timestamp (version) is
|
||||
* in the specified list of timestamps (versions).
|
||||
|
@ -42,6 +44,7 @@ import java.util.TreeSet;
|
|||
@InterfaceStability.Stable
|
||||
public class TimestampsFilter extends FilterBase {
|
||||
|
||||
private final boolean canHint;
|
||||
TreeSet<Long> timestamps;
|
||||
private static final int MAX_LOG_TIMESTAMPS = 5;
|
||||
|
||||
|
@ -49,17 +52,30 @@ public class TimestampsFilter extends FilterBase {
|
|||
// once the timestamps fall below the minTimeStamp.
|
||||
long minTimeStamp = Long.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Constructor for filter that retains only the specified timestamps in the list.
|
||||
* @param timestamps
|
||||
*/
|
||||
public TimestampsFilter(List<Long> timestamps) {
|
||||
this(timestamps, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for filter that retains only those
|
||||
* cells whose timestamp (version) is in the specified
|
||||
* list of timestamps.
|
||||
*
|
||||
* @param timestamps
|
||||
* @param timestamps list of timestamps that are wanted.
|
||||
* @param canHint should the filter provide a seek hint? This can skip
|
||||
* past delete tombstones, so it should only be used when that
|
||||
* is not an issue ( no deletes, or don't care if data
|
||||
* becomes visible)
|
||||
*/
|
||||
public TimestampsFilter(List<Long> timestamps) {
|
||||
public TimestampsFilter(List<Long> timestamps, boolean canHint) {
|
||||
for (Long timestamp : timestamps) {
|
||||
Preconditions.checkArgument(timestamp >= 0, "must be positive %s", timestamp);
|
||||
}
|
||||
this.canHint = canHint;
|
||||
this.timestamps = new TreeSet<Long>(timestamps);
|
||||
init();
|
||||
}
|
||||
|
@ -96,7 +112,41 @@ public class TimestampsFilter extends FilterBase {
|
|||
// to be lesser than all of the other values.
|
||||
return ReturnCode.NEXT_COL;
|
||||
}
|
||||
return ReturnCode.SKIP;
|
||||
return canHint ? ReturnCode.SEEK_NEXT_USING_HINT : ReturnCode.SKIP;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Pick the next cell that the scanner should seek to. Since this can skip any number of cells
|
||||
* any of which can be a delete this can resurect old data.
|
||||
*
|
||||
* The method will only be used if canHint was set to true while creating the filter.
|
||||
*
|
||||
* @throws IOException This will never happen.
|
||||
*/
|
||||
public Cell getNextCellHint(Cell currentCell) throws IOException {
|
||||
if (!canHint) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Long nextTimestampObject = timestamps.lower(currentCell.getTimestamp());
|
||||
|
||||
if (nextTimestampObject == null) {
|
||||
// This should only happen if the current column's
|
||||
// timestamp is below the last one in the list.
|
||||
//
|
||||
// It should never happen as the filterKeyValue should return NEXT_COL
|
||||
// but it's always better to be extra safe and protect against future
|
||||
// behavioral changes.
|
||||
|
||||
return KeyValueUtil.createLastOnRowCol(currentCell);
|
||||
}
|
||||
|
||||
// Since we know the nextTimestampObject isn't null here there must still be
|
||||
// timestamps that can be included. Cast the Long to a long and return the
|
||||
// a cell with the current row/cf/col and the next found timestamp.
|
||||
long nextTimestamp = nextTimestampObject;
|
||||
return KeyValueUtil.createFirstOnRowColTS(currentCell, nextTimestamp);
|
||||
}
|
||||
|
||||
// Override here explicitly as the method in super class FilterBase might do a KeyValue recreate.
|
||||
|
@ -118,20 +168,21 @@ public class TimestampsFilter extends FilterBase {
|
|||
/**
|
||||
* @return The filter serialized using pb
|
||||
*/
|
||||
public byte [] toByteArray() {
|
||||
public byte[] toByteArray() {
|
||||
FilterProtos.TimestampsFilter.Builder builder =
|
||||
FilterProtos.TimestampsFilter.newBuilder();
|
||||
builder.addAllTimestamps(this.timestamps);
|
||||
builder.setCanHint(canHint);
|
||||
return builder.build().toByteArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param pbBytes A pb serialized {@link TimestampsFilter} instance
|
||||
*
|
||||
* @return An instance of {@link TimestampsFilter} made from <code>bytes</code>
|
||||
* @throws DeserializationException
|
||||
* @see #toByteArray
|
||||
*/
|
||||
public static TimestampsFilter parseFrom(final byte [] pbBytes)
|
||||
public static TimestampsFilter parseFrom(final byte[] pbBytes)
|
||||
throws DeserializationException {
|
||||
FilterProtos.TimestampsFilter proto;
|
||||
try {
|
||||
|
@ -139,7 +190,8 @@ public class TimestampsFilter extends FilterBase {
|
|||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new DeserializationException(e);
|
||||
}
|
||||
return new TimestampsFilter(proto.getTimestampsList());
|
||||
return new TimestampsFilter(proto.getTimestampsList(),
|
||||
proto.hasCanHint() && proto.getCanHint());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -175,7 +227,7 @@ public class TimestampsFilter extends FilterBase {
|
|||
}
|
||||
}
|
||||
|
||||
return String.format("%s (%d/%d): [%s]", this.getClass().getSimpleName(),
|
||||
count, this.timestamps.size(), tsList.toString());
|
||||
return String.format("%s (%d/%d): [%s] canHint: [%b]", this.getClass().getSimpleName(),
|
||||
count, this.timestamps.size(), tsList.toString(), canHint);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13923,6 +13923,16 @@ public final class FilterProtos {
|
|||
* <code>repeated int64 timestamps = 1 [packed = true];</code>
|
||||
*/
|
||||
long getTimestamps(int index);
|
||||
|
||||
// optional bool can_hint = 2;
|
||||
/**
|
||||
* <code>optional bool can_hint = 2;</code>
|
||||
*/
|
||||
boolean hasCanHint();
|
||||
/**
|
||||
* <code>optional bool can_hint = 2;</code>
|
||||
*/
|
||||
boolean getCanHint();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code hbase.pb.TimestampsFilter}
|
||||
|
@ -13996,6 +14006,11 @@ public final class FilterProtos {
|
|||
input.popLimit(limit);
|
||||
break;
|
||||
}
|
||||
case 16: {
|
||||
bitField0_ |= 0x00000001;
|
||||
canHint_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -14038,6 +14053,7 @@ public final class FilterProtos {
|
|||
return PARSER;
|
||||
}
|
||||
|
||||
private int bitField0_;
|
||||
// repeated int64 timestamps = 1 [packed = true];
|
||||
public static final int TIMESTAMPS_FIELD_NUMBER = 1;
|
||||
private java.util.List<java.lang.Long> timestamps_;
|
||||
|
@ -14062,8 +14078,25 @@ public final class FilterProtos {
|
|||
}
|
||||
private int timestampsMemoizedSerializedSize = -1;
|
||||
|
||||
// optional bool can_hint = 2;
|
||||
public static final int CAN_HINT_FIELD_NUMBER = 2;
|
||||
private boolean canHint_;
|
||||
/**
|
||||
* <code>optional bool can_hint = 2;</code>
|
||||
*/
|
||||
public boolean hasCanHint() {
|
||||
return ((bitField0_ & 0x00000001) == 0x00000001);
|
||||
}
|
||||
/**
|
||||
* <code>optional bool can_hint = 2;</code>
|
||||
*/
|
||||
public boolean getCanHint() {
|
||||
return canHint_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
timestamps_ = java.util.Collections.emptyList();
|
||||
canHint_ = false;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -14084,6 +14117,9 @@ public final class FilterProtos {
|
|||
for (int i = 0; i < timestamps_.size(); i++) {
|
||||
output.writeInt64NoTag(timestamps_.get(i));
|
||||
}
|
||||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
output.writeBool(2, canHint_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -14107,6 +14143,10 @@ public final class FilterProtos {
|
|||
}
|
||||
timestampsMemoizedSerializedSize = dataSize;
|
||||
}
|
||||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(2, canHint_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -14132,6 +14172,11 @@ public final class FilterProtos {
|
|||
boolean result = true;
|
||||
result = result && getTimestampsList()
|
||||
.equals(other.getTimestampsList());
|
||||
result = result && (hasCanHint() == other.hasCanHint());
|
||||
if (hasCanHint()) {
|
||||
result = result && (getCanHint()
|
||||
== other.getCanHint());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -14149,6 +14194,10 @@ public final class FilterProtos {
|
|||
hash = (37 * hash) + TIMESTAMPS_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getTimestampsList().hashCode();
|
||||
}
|
||||
if (hasCanHint()) {
|
||||
hash = (37 * hash) + CAN_HINT_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashBoolean(getCanHint());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -14260,6 +14309,8 @@ public final class FilterProtos {
|
|||
super.clear();
|
||||
timestamps_ = java.util.Collections.emptyList();
|
||||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
canHint_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -14287,11 +14338,17 @@ public final class FilterProtos {
|
|||
public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.TimestampsFilter buildPartial() {
|
||||
org.apache.hadoop.hbase.protobuf.generated.FilterProtos.TimestampsFilter result = new org.apache.hadoop.hbase.protobuf.generated.FilterProtos.TimestampsFilter(this);
|
||||
int from_bitField0_ = bitField0_;
|
||||
int to_bitField0_ = 0;
|
||||
if (((bitField0_ & 0x00000001) == 0x00000001)) {
|
||||
timestamps_ = java.util.Collections.unmodifiableList(timestamps_);
|
||||
bitField0_ = (bitField0_ & ~0x00000001);
|
||||
}
|
||||
result.timestamps_ = timestamps_;
|
||||
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
|
||||
to_bitField0_ |= 0x00000001;
|
||||
}
|
||||
result.canHint_ = canHint_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
}
|
||||
|
@ -14317,6 +14374,9 @@ public final class FilterProtos {
|
|||
}
|
||||
onChanged();
|
||||
}
|
||||
if (other.hasCanHint()) {
|
||||
setCanHint(other.getCanHint());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -14410,6 +14470,39 @@ public final class FilterProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional bool can_hint = 2;
|
||||
private boolean canHint_ ;
|
||||
/**
|
||||
* <code>optional bool can_hint = 2;</code>
|
||||
*/
|
||||
public boolean hasCanHint() {
|
||||
return ((bitField0_ & 0x00000002) == 0x00000002);
|
||||
}
|
||||
/**
|
||||
* <code>optional bool can_hint = 2;</code>
|
||||
*/
|
||||
public boolean getCanHint() {
|
||||
return canHint_;
|
||||
}
|
||||
/**
|
||||
* <code>optional bool can_hint = 2;</code>
|
||||
*/
|
||||
public Builder setCanHint(boolean value) {
|
||||
bitField0_ |= 0x00000002;
|
||||
canHint_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional bool can_hint = 2;</code>
|
||||
*/
|
||||
public Builder clearCanHint() {
|
||||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
canHint_ = false;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:hbase.pb.TimestampsFilter)
|
||||
}
|
||||
|
||||
|
@ -17503,18 +17596,18 @@ public final class FilterProtos {
|
|||
"ompareType\022(\n\ncomparator\030\004 \002(\0132\024.hbase.p" +
|
||||
"b.Comparator\022\031\n\021filter_if_missing\030\005 \001(\010\022" +
|
||||
"\033\n\023latest_version_only\030\006 \001(\010\".\n\nSkipFilt" +
|
||||
"er\022 \n\006filter\030\001 \002(\0132\020.hbase.pb.Filter\"*\n\020" +
|
||||
"er\022 \n\006filter\030\001 \002(\0132\020.hbase.pb.Filter\"<\n\020" +
|
||||
"TimestampsFilter\022\026\n\ntimestamps\030\001 \003(\003B\002\020\001" +
|
||||
"\">\n\013ValueFilter\022/\n\016compare_filter\030\001 \002(\0132" +
|
||||
"\027.hbase.pb.CompareFilter\"4\n\020WhileMatchFi" +
|
||||
"lter\022 \n\006filter\030\001 \002(\0132\020.hbase.pb.Filter\"\021" +
|
||||
"\n\017FilterAllFilter\"h\n\010RowRange\022\021\n\tstart_r",
|
||||
"ow\030\001 \001(\014\022\033\n\023start_row_inclusive\030\002 \001(\010\022\020\n" +
|
||||
"\010stop_row\030\003 \001(\014\022\032\n\022stop_row_inclusive\030\004 " +
|
||||
"\001(\010\"A\n\023MultiRowRangeFilter\022*\n\016row_range_" +
|
||||
"list\030\001 \003(\0132\022.hbase.pb.RowRangeBB\n*org.ap" +
|
||||
"ache.hadoop.hbase.protobuf.generatedB\014Fi" +
|
||||
"lterProtosH\001\210\001\001\240\001\001"
|
||||
"\022\020\n\010can_hint\030\002 \001(\010\">\n\013ValueFilter\022/\n\016com" +
|
||||
"pare_filter\030\001 \002(\0132\027.hbase.pb.CompareFilt" +
|
||||
"er\"4\n\020WhileMatchFilter\022 \n\006filter\030\001 \002(\0132\020" +
|
||||
".hbase.pb.Filter\"\021\n\017FilterAllFilter\"h\n\010R",
|
||||
"owRange\022\021\n\tstart_row\030\001 \001(\014\022\033\n\023start_row_" +
|
||||
"inclusive\030\002 \001(\010\022\020\n\010stop_row\030\003 \001(\014\022\032\n\022sto" +
|
||||
"p_row_inclusive\030\004 \001(\010\"A\n\023MultiRowRangeFi" +
|
||||
"lter\022*\n\016row_range_list\030\001 \003(\0132\022.hbase.pb." +
|
||||
"RowRangeBB\n*org.apache.hadoop.hbase.prot" +
|
||||
"obuf.generatedB\014FilterProtosH\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -17670,7 +17763,7 @@ public final class FilterProtos {
|
|||
internal_static_hbase_pb_TimestampsFilter_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_hbase_pb_TimestampsFilter_descriptor,
|
||||
new java.lang.String[] { "Timestamps", });
|
||||
new java.lang.String[] { "Timestamps", "CanHint", });
|
||||
internal_static_hbase_pb_ValueFilter_descriptor =
|
||||
getDescriptor().getMessageTypes().get(25);
|
||||
internal_static_hbase_pb_ValueFilter_fieldAccessorTable = new
|
||||
|
|
|
@ -146,6 +146,7 @@ message SkipFilter {
|
|||
|
||||
message TimestampsFilter {
|
||||
repeated int64 timestamps = 1 [packed=true];
|
||||
optional bool can_hint = 2;
|
||||
}
|
||||
|
||||
message ValueFilter {
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.filter.TimestampsFilter;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
public class TestTimestampFilterSeekHint {
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
|
||||
private final static String RK = "myRK";
|
||||
private final static byte[] RK_BYTES = Bytes.toBytes(RK);
|
||||
|
||||
private final static String FAMILY = "D";
|
||||
private final static byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
|
||||
|
||||
private final static String QUAL = "0";
|
||||
private final static byte[] QUAL_BYTES = Bytes.toBytes(QUAL);
|
||||
|
||||
public static final int MAX_VERSIONS = 50000;
|
||||
private HRegion region;
|
||||
private int regionCount = 0;
|
||||
|
||||
@Test
|
||||
public void testGetSeek() throws IOException {
|
||||
StoreFileScanner.instrument();
|
||||
prepareRegion();
|
||||
|
||||
Get g = new Get(RK_BYTES);
|
||||
final TimestampsFilter timestampsFilter = new TimestampsFilter(ImmutableList.of(5L), true);
|
||||
g.setFilter(timestampsFilter);
|
||||
final long initialSeekCount = StoreFileScanner.getSeekCount();
|
||||
region.get(g);
|
||||
final long finalSeekCount = StoreFileScanner.getSeekCount();
|
||||
|
||||
/*
|
||||
Make sure there's more than one.
|
||||
Aka one seek to get to the row, and one to get to the time.
|
||||
*/
|
||||
assertTrue(finalSeekCount >= initialSeekCount + 3 );
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDoesntSeekWithNoHint() throws IOException {
|
||||
StoreFileScanner.instrument();
|
||||
prepareRegion();
|
||||
|
||||
Get g = new Get(RK_BYTES);
|
||||
g.setFilter(new TimestampsFilter(ImmutableList.of(5L)));
|
||||
final long initialSeekCount = StoreFileScanner.getSeekCount();
|
||||
region.get(g);
|
||||
final long finalSeekCount = StoreFileScanner.getSeekCount();
|
||||
|
||||
assertTrue(finalSeekCount >= initialSeekCount );
|
||||
assertTrue(finalSeekCount < initialSeekCount + 3);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void prepareRegion() throws IOException {
|
||||
region =
|
||||
TEST_UTIL.createTestRegion("TestTimestampFilterSeekHint" + regionCount++,
|
||||
new HColumnDescriptor(FAMILY)
|
||||
.setBlocksize(1024)
|
||||
.setMaxVersions(MAX_VERSIONS)
|
||||
);
|
||||
|
||||
for (long i = 0; i <MAX_VERSIONS - 2; i++) {
|
||||
Put p = new Put(RK_BYTES, i);
|
||||
p.addColumn(FAMILY_BYTES, QUAL_BYTES, Bytes.toBytes(RandomStringUtils.randomAlphabetic(255)));
|
||||
region.put(p);
|
||||
}
|
||||
region.flush(true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue