HBASE-15475 Allow TimestampsFilter to provide a seek hint

Summary:
Allow TestTimestampFilterSeekHint to provide a seek next hint.
This can be incorrect as it might skip deletes. However it can
make things much much faster.

Test Plan: Added a unit test.

Differential Revision: https://reviews.facebook.net/D55617
This commit is contained in:
Elliott Clark 2016-03-16 21:14:55 -07:00
parent d7a4499dfc
commit 925c185969
6 changed files with 311 additions and 40 deletions

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.TreeSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@ -44,6 +45,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
@InterfaceStability.Stable
public class TimestampsFilter extends FilterBase {
private final boolean canHint;
TreeSet<Long> timestamps;
private static final int MAX_LOG_TIMESTAMPS = 5;
@ -51,17 +53,30 @@ public class TimestampsFilter extends FilterBase {
// once the timestamps fall below the minTimeStamp.
long minTimeStamp = Long.MAX_VALUE;
/**
* Constructor for filter that retains only the specified timestamps in the list.
* @param timestamps
*/
public TimestampsFilter(List<Long> timestamps) {
this(timestamps, false);
}
/**
* Constructor for filter that retains only those
* cells whose timestamp (version) is in the specified
* list of timestamps.
*
* @param timestamps
* @param timestamps list of timestamps that are wanted.
* @param canHint should the filter provide a seek hint? This can skip
* past delete tombstones, so it should only be used when that
* is not an issue ( no deletes, or don't care if data
* becomes visible)
*/
public TimestampsFilter(List<Long> timestamps) {
public TimestampsFilter(List<Long> timestamps, boolean canHint) {
for (Long timestamp : timestamps) {
Preconditions.checkArgument(timestamp >= 0, "must be positive %s", timestamp);
}
this.canHint = canHint;
this.timestamps = new TreeSet<Long>(timestamps);
init();
}
@ -104,7 +119,41 @@ public class TimestampsFilter extends FilterBase {
// to be lesser than all of the other values.
return ReturnCode.NEXT_COL;
}
return ReturnCode.SKIP;
return canHint ? ReturnCode.SEEK_NEXT_USING_HINT : ReturnCode.SKIP;
}
/**
* Pick the next cell that the scanner should seek to. Since this can skip any number of cells
* any of which can be a delete this can resurect old data.
*
* The method will only be used if canHint was set to true while creating the filter.
*
* @throws IOException This will never happen.
*/
public Cell getNextCellHint(Cell currentCell) throws IOException {
if (!canHint) {
return null;
}
Long nextTimestampObject = timestamps.lower(currentCell.getTimestamp());
if (nextTimestampObject == null) {
// This should only happen if the current column's
// timestamp is below the last one in the list.
//
// It should never happen as the filterKeyValue should return NEXT_COL
// but it's always better to be extra safe and protect against future
// behavioral changes.
return CellUtil.createLastOnRowCol(currentCell);
}
// Since we know the nextTimestampObject isn't null here there must still be
// timestamps that can be included. Cast the Long to a long and return the
// a cell with the current row/cf/col and the next found timestamp.
long nextTimestamp = nextTimestampObject;
return CellUtil.createFirstOnRowColTS(currentCell, nextTimestamp);
}
public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
@ -119,20 +168,21 @@ public class TimestampsFilter extends FilterBase {
/**
* @return The filter serialized using pb
*/
public byte [] toByteArray() {
public byte[] toByteArray() {
FilterProtos.TimestampsFilter.Builder builder =
FilterProtos.TimestampsFilter.newBuilder();
builder.addAllTimestamps(this.timestamps);
builder.setCanHint(canHint);
return builder.build().toByteArray();
}
/**
* @param pbBytes A pb serialized {@link TimestampsFilter} instance
*
* @return An instance of {@link TimestampsFilter} made from <code>bytes</code>
* @throws DeserializationException
* @see #toByteArray
*/
public static TimestampsFilter parseFrom(final byte [] pbBytes)
public static TimestampsFilter parseFrom(final byte[] pbBytes)
throws DeserializationException {
FilterProtos.TimestampsFilter proto;
try {
@ -140,7 +190,8 @@ public class TimestampsFilter extends FilterBase {
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return new TimestampsFilter(proto.getTimestampsList());
return new TimestampsFilter(proto.getTimestampsList(),
proto.hasCanHint() && proto.getCanHint());
}
/**
@ -176,7 +227,7 @@ public class TimestampsFilter extends FilterBase {
}
}
return String.format("%s (%d/%d): [%s]", this.getClass().getSimpleName(),
count, this.timestamps.size(), tsList.toString());
return String.format("%s (%d/%d): [%s] canHint: [%b]", this.getClass().getSimpleName(),
count, this.timestamps.size(), tsList.toString(), canHint);
}
}

View File

@ -13923,6 +13923,16 @@ public final class FilterProtos {
* <code>repeated int64 timestamps = 1 [packed = true];</code>
*/
long getTimestamps(int index);
// optional bool can_hint = 2;
/**
* <code>optional bool can_hint = 2;</code>
*/
boolean hasCanHint();
/**
* <code>optional bool can_hint = 2;</code>
*/
boolean getCanHint();
}
/**
* Protobuf type {@code hbase.pb.TimestampsFilter}
@ -13996,6 +14006,11 @@ public final class FilterProtos {
input.popLimit(limit);
break;
}
case 16: {
bitField0_ |= 0x00000001;
canHint_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -14038,6 +14053,7 @@ public final class FilterProtos {
return PARSER;
}
private int bitField0_;
// repeated int64 timestamps = 1 [packed = true];
public static final int TIMESTAMPS_FIELD_NUMBER = 1;
private java.util.List<java.lang.Long> timestamps_;
@ -14062,8 +14078,25 @@ public final class FilterProtos {
}
private int timestampsMemoizedSerializedSize = -1;
// optional bool can_hint = 2;
public static final int CAN_HINT_FIELD_NUMBER = 2;
private boolean canHint_;
/**
* <code>optional bool can_hint = 2;</code>
*/
public boolean hasCanHint() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>optional bool can_hint = 2;</code>
*/
public boolean getCanHint() {
return canHint_;
}
private void initFields() {
timestamps_ = java.util.Collections.emptyList();
canHint_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -14084,6 +14117,9 @@ public final class FilterProtos {
for (int i = 0; i < timestamps_.size(); i++) {
output.writeInt64NoTag(timestamps_.get(i));
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBool(2, canHint_);
}
getUnknownFields().writeTo(output);
}
@ -14107,6 +14143,10 @@ public final class FilterProtos {
}
timestampsMemoizedSerializedSize = dataSize;
}
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(2, canHint_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -14132,6 +14172,11 @@ public final class FilterProtos {
boolean result = true;
result = result && getTimestampsList()
.equals(other.getTimestampsList());
result = result && (hasCanHint() == other.hasCanHint());
if (hasCanHint()) {
result = result && (getCanHint()
== other.getCanHint());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -14149,6 +14194,10 @@ public final class FilterProtos {
hash = (37 * hash) + TIMESTAMPS_FIELD_NUMBER;
hash = (53 * hash) + getTimestampsList().hashCode();
}
if (hasCanHint()) {
hash = (37 * hash) + CAN_HINT_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getCanHint());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -14260,6 +14309,8 @@ public final class FilterProtos {
super.clear();
timestamps_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000001);
canHint_ = false;
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
@ -14287,11 +14338,17 @@ public final class FilterProtos {
public org.apache.hadoop.hbase.protobuf.generated.FilterProtos.TimestampsFilter buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.FilterProtos.TimestampsFilter result = new org.apache.hadoop.hbase.protobuf.generated.FilterProtos.TimestampsFilter(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
timestamps_ = java.util.Collections.unmodifiableList(timestamps_);
bitField0_ = (bitField0_ & ~0x00000001);
}
result.timestamps_ = timestamps_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000001;
}
result.canHint_ = canHint_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@ -14317,6 +14374,9 @@ public final class FilterProtos {
}
onChanged();
}
if (other.hasCanHint()) {
setCanHint(other.getCanHint());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -14410,6 +14470,39 @@ public final class FilterProtos {
return this;
}
// optional bool can_hint = 2;
private boolean canHint_ ;
/**
* <code>optional bool can_hint = 2;</code>
*/
public boolean hasCanHint() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>optional bool can_hint = 2;</code>
*/
public boolean getCanHint() {
return canHint_;
}
/**
* <code>optional bool can_hint = 2;</code>
*/
public Builder setCanHint(boolean value) {
bitField0_ |= 0x00000002;
canHint_ = value;
onChanged();
return this;
}
/**
* <code>optional bool can_hint = 2;</code>
*/
public Builder clearCanHint() {
bitField0_ = (bitField0_ & ~0x00000002);
canHint_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:hbase.pb.TimestampsFilter)
}
@ -17503,18 +17596,18 @@ public final class FilterProtos {
"ompareType\022(\n\ncomparator\030\004 \002(\0132\024.hbase.p" +
"b.Comparator\022\031\n\021filter_if_missing\030\005 \001(\010\022" +
"\033\n\023latest_version_only\030\006 \001(\010\".\n\nSkipFilt" +
"er\022 \n\006filter\030\001 \002(\0132\020.hbase.pb.Filter\"*\n\020" +
"er\022 \n\006filter\030\001 \002(\0132\020.hbase.pb.Filter\"<\n\020" +
"TimestampsFilter\022\026\n\ntimestamps\030\001 \003(\003B\002\020\001" +
"\">\n\013ValueFilter\022/\n\016compare_filter\030\001 \002(\0132" +
"\027.hbase.pb.CompareFilter\"4\n\020WhileMatchFi" +
"lter\022 \n\006filter\030\001 \002(\0132\020.hbase.pb.Filter\"\021" +
"\n\017FilterAllFilter\"h\n\010RowRange\022\021\n\tstart_r",
"ow\030\001 \001(\014\022\033\n\023start_row_inclusive\030\002 \001(\010\022\020\n" +
"\010stop_row\030\003 \001(\014\022\032\n\022stop_row_inclusive\030\004 " +
"\001(\010\"A\n\023MultiRowRangeFilter\022*\n\016row_range_" +
"list\030\001 \003(\0132\022.hbase.pb.RowRangeBB\n*org.ap" +
"ache.hadoop.hbase.protobuf.generatedB\014Fi" +
"lterProtosH\001\210\001\001\240\001\001"
"\022\020\n\010can_hint\030\002 \001(\010\">\n\013ValueFilter\022/\n\016com" +
"pare_filter\030\001 \002(\0132\027.hbase.pb.CompareFilt" +
"er\"4\n\020WhileMatchFilter\022 \n\006filter\030\001 \002(\0132\020" +
".hbase.pb.Filter\"\021\n\017FilterAllFilter\"h\n\010R",
"owRange\022\021\n\tstart_row\030\001 \001(\014\022\033\n\023start_row_" +
"inclusive\030\002 \001(\010\022\020\n\010stop_row\030\003 \001(\014\022\032\n\022sto" +
"p_row_inclusive\030\004 \001(\010\"A\n\023MultiRowRangeFi" +
"lter\022*\n\016row_range_list\030\001 \003(\0132\022.hbase.pb." +
"RowRangeBB\n*org.apache.hadoop.hbase.prot" +
"obuf.generatedB\014FilterProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -17670,7 +17763,7 @@ public final class FilterProtos {
internal_static_hbase_pb_TimestampsFilter_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_TimestampsFilter_descriptor,
new java.lang.String[] { "Timestamps", });
new java.lang.String[] { "Timestamps", "CanHint", });
internal_static_hbase_pb_ValueFilter_descriptor =
getDescriptor().getMessageTypes().get(25);
internal_static_hbase_pb_ValueFilter_fieldAccessorTable = new

View File

@ -146,6 +146,7 @@ message SkipFilter {
message TimestampsFilter {
repeated int64 timestamps = 1 [packed=true];
optional bool can_hint = 2;
}
message ValueFilter {

View File

@ -397,6 +397,7 @@ public class TestHFile {
/**
* Make sure the ordinals for our compression algorithms do not change on us.
*/
@Test
public void testCompressionOrdinance() {
assertTrue(Compression.Algorithm.LZO.ordinal() == 0);
assertTrue(Compression.Algorithm.GZ.ordinal() == 1);
@ -405,6 +406,25 @@ public class TestHFile {
assertTrue(Compression.Algorithm.LZ4.ordinal() == 4);
}
@Test
public void testShortMidpointSameQual() {
Cell left = CellUtil.createCell(Bytes.toBytes("a"),
Bytes.toBytes("a"),
Bytes.toBytes("a"),
9,
KeyValue.Type.Maximum.getCode(),
HConstants.EMPTY_BYTE_ARRAY);
Cell right = CellUtil.createCell(Bytes.toBytes("a"),
Bytes.toBytes("a"),
Bytes.toBytes("a"),
11,
KeyValue.Type.Maximum.getCode(),
HConstants.EMPTY_BYTE_ARRAY);
Cell mid = HFileWriterImpl.getMidpoint(CellComparator.COMPARATOR, left, right);
assertTrue(CellComparator.COMPARATOR.compareKeyIgnoresMvcc(left, mid) <= 0);
assertTrue(CellComparator.COMPARATOR.compareKeyIgnoresMvcc(mid, right) == 0);
}
@Test
public void testGetShortMidpoint() {
Cell left = CellUtil.createCell(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes("a"));

View File

@ -0,0 +1,106 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.filter.TimestampsFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertTrue;
@Category({RegionServerTests.class, LargeTests.class})
public class TestTimestampFilterSeekHint {
private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
private final static String RK = "myRK";
private final static byte[] RK_BYTES = Bytes.toBytes(RK);
private final static String FAMILY = "D";
private final static byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
private final static String QUAL = "0";
private final static byte[] QUAL_BYTES = Bytes.toBytes(QUAL);
public static final int MAX_VERSIONS = 50000;
private HRegion region;
private int regionCount = 0;
@Test
public void testGetSeek() throws IOException {
StoreFileScanner.instrument();
prepareRegion();
Get g = new Get(RK_BYTES);
final TimestampsFilter timestampsFilter = new TimestampsFilter(ImmutableList.of(5L), true);
g.setFilter(timestampsFilter);
final long initialSeekCount = StoreFileScanner.getSeekCount();
region.get(g);
final long finalSeekCount = StoreFileScanner.getSeekCount();
/*
Make sure there's more than one.
Aka one seek to get to the row, and one to get to the time.
*/
assertTrue(finalSeekCount >= initialSeekCount + 3 );
}
@Test
public void testGetDoesntSeekWithNoHint() throws IOException {
StoreFileScanner.instrument();
prepareRegion();
Get g = new Get(RK_BYTES);
g.setFilter(new TimestampsFilter(ImmutableList.of(5L)));
final long initialSeekCount = StoreFileScanner.getSeekCount();
region.get(g);
final long finalSeekCount = StoreFileScanner.getSeekCount();
assertTrue(finalSeekCount >= initialSeekCount );
assertTrue(finalSeekCount < initialSeekCount + 3);
}
@Before
public void prepareRegion() throws IOException {
region =
TEST_UTIL.createTestRegion("TestTimestampFilterSeekHint" + regionCount++,
new HColumnDescriptor(FAMILY)
.setBlocksize(1024)
.setMaxVersions(MAX_VERSIONS)
);
for (long i = 0; i <MAX_VERSIONS - 2; i++) {
Put p = new Put(RK_BYTES, i);
p.addColumn(FAMILY_BYTES, QUAL_BYTES, Bytes.toBytes(RandomStringUtils.randomAlphabetic(255)));
region.put(p);
}
region.flush(true);
}
}