HBASE-1517 Implement inexpensive seek operations in HFile (Pranav via Ryan)
HBASE-2903 ColumnPrefix filtering (Pranav via Ryan) HBASE-2904 Smart seeking using filters (Pranav via Ryan) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@985412 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d80de85a69
commit
4784ec1b57
|
@ -838,6 +838,9 @@ Release 0.21.0 - Unreleased
|
||||||
(Alex Newman via Stack)
|
(Alex Newman via Stack)
|
||||||
HBASE-1660 HBASE-1660 script to handle rolling restarts
|
HBASE-1660 HBASE-1660 script to handle rolling restarts
|
||||||
(Nicolas Spiegelberg via Stack)
|
(Nicolas Spiegelberg via Stack)
|
||||||
|
HBASE-1517 Implement inexpensive seek operations in HFile (Pranav via Ryan)
|
||||||
|
HBASE-2903 ColumnPrefix filtering (Pranav via Ryan)
|
||||||
|
HBASE-2904 Smart seeking using filters (Pranav via Ryan)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -1609,6 +1609,31 @@ public class KeyValue implements Writable, HeapSize {
|
||||||
return new KeyValue(row, f, q, ts, Type.Maximum);
|
return new KeyValue(row, f, q, ts, Type.Maximum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a KeyValue for the specified row, family and qualifier that would be
|
||||||
|
* smaller than all other possible KeyValues that have the same row,
|
||||||
|
* family, qualifier.
|
||||||
|
* Used for seeking.
|
||||||
|
* @param row row key
|
||||||
|
* @param roffset row offset
|
||||||
|
* @param rlength row length
|
||||||
|
* @param family family name
|
||||||
|
* @param foffset family offset
|
||||||
|
* @param flength family length
|
||||||
|
* @param qualifier column qualifier
|
||||||
|
* @param qoffset qualifier offset
|
||||||
|
* @param qlength qualifier length
|
||||||
|
* @return First possible key on passed Row, Family, Qualifier.
|
||||||
|
*/
|
||||||
|
public static KeyValue createFirstOnRow(final byte [] row,
|
||||||
|
final int roffset, final int rlength, final byte [] family,
|
||||||
|
final int foffset, final int flength, final byte [] qualifier,
|
||||||
|
final int qoffset, final int qlength) {
|
||||||
|
return new KeyValue(row, roffset, rlength, family,
|
||||||
|
foffset, flength, qualifier, qoffset, qlength,
|
||||||
|
HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param b
|
* @param b
|
||||||
* @return A KeyValue made of a byte array that holds the key-only part.
|
* @return A KeyValue made of a byte array that holds the key-only part.
|
||||||
|
|
|
@ -0,0 +1,96 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.filter;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.DataInput;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This filter is used for selecting only those keys with columns that matches
|
||||||
|
* a particular prefix. For example, if prefix is 'an', it will pass keys will
|
||||||
|
* columns like 'and', 'anti' but not keys with columns like 'ball', 'act'.
|
||||||
|
*/
|
||||||
|
public class ColumnPrefixFilter extends FilterBase {
|
||||||
|
protected byte [] prefix = null;
|
||||||
|
|
||||||
|
public ColumnPrefixFilter() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ColumnPrefixFilter(final byte [] prefix) {
|
||||||
|
this.prefix = prefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getPrefix() {
|
||||||
|
return prefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReturnCode filterKeyValue(KeyValue kv) {
|
||||||
|
if (this.prefix == null || kv.getBuffer() == null) {
|
||||||
|
return ReturnCode.INCLUDE;
|
||||||
|
} else {
|
||||||
|
return filterColumn(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReturnCode filterColumn(byte[] buffer, int qualifierOffset, int qualifierLength) {
|
||||||
|
if (qualifierLength < prefix.length) {
|
||||||
|
int cmp = Bytes.compareTo(buffer, qualifierOffset, qualifierLength, this.prefix, 0,
|
||||||
|
qualifierLength);
|
||||||
|
if (cmp <= 0) {
|
||||||
|
return ReturnCode.SEEK_NEXT_USING_HINT;
|
||||||
|
} else {
|
||||||
|
return ReturnCode.NEXT_ROW;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int cmp = Bytes.compareTo(buffer, qualifierOffset, this.prefix.length, this.prefix, 0,
|
||||||
|
this.prefix.length);
|
||||||
|
if (cmp < 0) {
|
||||||
|
return ReturnCode.SEEK_NEXT_USING_HINT;
|
||||||
|
} else if (cmp > 0) {
|
||||||
|
return ReturnCode.NEXT_ROW;
|
||||||
|
} else {
|
||||||
|
return ReturnCode.INCLUDE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
Bytes.writeByteArray(out, this.prefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
this.prefix = Bytes.readByteArray(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public KeyValue getNextKeyHint(KeyValue kv) {
|
||||||
|
return KeyValue.createFirstOnRow(
|
||||||
|
kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
|
||||||
|
kv.getFamilyOffset(), kv.getFamilyLength(), prefix, 0, prefix.length);
|
||||||
|
}
|
||||||
|
}
|
|
@ -110,7 +110,11 @@ public interface Filter extends Writable {
|
||||||
* still be called.
|
* still be called.
|
||||||
*/
|
*/
|
||||||
NEXT_ROW,
|
NEXT_ROW,
|
||||||
}
|
/**
|
||||||
|
* Seek to next key which is given as hint by the filter.
|
||||||
|
*/
|
||||||
|
SEEK_NEXT_USING_HINT,
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chance to alter the list of keyvalues to be submitted.
|
* Chance to alter the list of keyvalues to be submitted.
|
||||||
|
@ -136,4 +140,13 @@ public interface Filter extends Writable {
|
||||||
*/
|
*/
|
||||||
public boolean filterRow();
|
public boolean filterRow();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the filter returns the match code SEEK_NEXT_USING_HINT, then
|
||||||
|
* it should also tell which is the next key it must seek to.
|
||||||
|
* After receiving the match code SEEK_NEXT_USING_HINT, the QueryMatcher would
|
||||||
|
* call this function to find out which key it must next seek to.
|
||||||
|
* @return KeyValue which must be next seeked. return null if the filter is
|
||||||
|
* not sure which key to seek to next.
|
||||||
|
*/
|
||||||
|
public KeyValue getNextKeyHint(KeyValue currentKV);
|
||||||
}
|
}
|
|
@ -110,4 +110,15 @@ public abstract class FilterBase implements Filter {
|
||||||
public boolean filterRow() {
|
public boolean filterRow() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filters that are not sure which key must be next seeked to, can inherit
|
||||||
|
* this implementation that, by default, returns a null KeyValue.
|
||||||
|
*
|
||||||
|
* @inheritDoc
|
||||||
|
*/
|
||||||
|
public KeyValue getNextKeyHint(KeyValue currentKV) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -245,4 +245,9 @@ public class FilterList implements Filter {
|
||||||
HbaseObjectWritable.writeObject(out, filter, Writable.class, conf);
|
HbaseObjectWritable.writeObject(out, filter, Writable.class, conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public KeyValue getNextKeyHint(KeyValue currentKV) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -201,6 +201,37 @@ public class HalfStoreFileReader extends StoreFile.Reader {
|
||||||
return delegate.seekTo(key, offset, length);
|
return delegate.seekTo(key, offset, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int reseekTo(byte[] key) throws IOException {
|
||||||
|
return reseekTo(key, 0, key.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int reseekTo(byte[] key, int offset, int length)
|
||||||
|
throws IOException {
|
||||||
|
//This function is identical to the corresponding seekTo function except
|
||||||
|
//that we call reseekTo (and not seekTo) on the delegate.
|
||||||
|
if (top) {
|
||||||
|
if (getComparator().compare(key, offset, length, splitkey, 0,
|
||||||
|
splitkey.length) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (getComparator().compare(key, offset, length, splitkey, 0,
|
||||||
|
splitkey.length) >= 0) {
|
||||||
|
// we would place the scanner in the second half.
|
||||||
|
// it might be an error to return false here ever...
|
||||||
|
boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
|
||||||
|
if (!res) {
|
||||||
|
throw new IOException("Seeking for a key in bottom of file, but" +
|
||||||
|
" key exists in top of file, failed on seekBefore(midkey)");
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return delegate.reseekTo(key, offset, length);
|
||||||
|
}
|
||||||
|
|
||||||
public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
|
public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
|
||||||
return this.delegate.getReader();
|
return this.delegate.getReader();
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,6 +159,7 @@ public class HbaseObjectWritable implements Writable, Configurable {
|
||||||
addToMap(WritableByteArrayComparable.class, code++);
|
addToMap(WritableByteArrayComparable.class, code++);
|
||||||
addToMap(FirstKeyOnlyFilter.class, code++);
|
addToMap(FirstKeyOnlyFilter.class, code++);
|
||||||
addToMap(DependentColumnFilter.class, code++);
|
addToMap(DependentColumnFilter.class, code++);
|
||||||
|
addToMap(ColumnPrefixFilter.class, code++);
|
||||||
|
|
||||||
addToMap(Delete [].class, code++);
|
addToMap(Delete [].class, code++);
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,6 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.SortedSet;
|
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.CommandLineParser;
|
import org.apache.commons.cli.CommandLineParser;
|
||||||
|
@ -1256,13 +1255,37 @@ public class HFile {
|
||||||
return seekTo(key, 0, key.length);
|
return seekTo(key, 0, key.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public int seekTo(byte[] key, int offset, int length) throws IOException {
|
public int seekTo(byte[] key, int offset, int length) throws IOException {
|
||||||
int b = reader.blockContainingKey(key, offset, length);
|
int b = reader.blockContainingKey(key, offset, length);
|
||||||
if (b < 0) return -1; // falls before the beginning of the file! :-(
|
if (b < 0) return -1; // falls before the beginning of the file! :-(
|
||||||
// Avoid re-reading the same block (that'd be dumb).
|
// Avoid re-reading the same block (that'd be dumb).
|
||||||
loadBlock(b);
|
loadBlock(b, true);
|
||||||
|
return blockSeek(key, offset, length, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int reseekTo(byte [] key) throws IOException {
|
||||||
|
return reseekTo(key, 0, key.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int reseekTo(byte[] key, int offset, int length)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
if (this.block != null && this.currKeyLen != 0) {
|
||||||
|
ByteBuffer bb = getKey();
|
||||||
|
int compared = this.reader.comparator.compare(key, offset, length,
|
||||||
|
bb.array(), bb.arrayOffset(), bb.limit());
|
||||||
|
if (compared < 1) {
|
||||||
|
//If the required key is less than or equal to current key, then
|
||||||
|
//don't do anything.
|
||||||
|
return compared;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int b = reader.blockContainingKey(key, offset, length);
|
||||||
|
if (b < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
loadBlock(b, false);
|
||||||
return blockSeek(key, offset, length, false);
|
return blockSeek(key, offset, length, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1336,7 +1359,7 @@ public class HFile {
|
||||||
b--;
|
b--;
|
||||||
// TODO shortcut: seek forward in this block to the last key of the block.
|
// TODO shortcut: seek forward in this block to the last key of the block.
|
||||||
}
|
}
|
||||||
loadBlock(b);
|
loadBlock(b, true);
|
||||||
blockSeek(key, offset, length, true);
|
blockSeek(key, offset, length, true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1377,7 +1400,7 @@ public class HFile {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void loadBlock(int bloc) throws IOException {
|
private void loadBlock(int bloc, boolean rewind) throws IOException {
|
||||||
if (block == null) {
|
if (block == null) {
|
||||||
block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
|
block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
|
||||||
currBlock = bloc;
|
currBlock = bloc;
|
||||||
|
@ -1389,7 +1412,13 @@ public class HFile {
|
||||||
blockFetches++;
|
blockFetches++;
|
||||||
} else {
|
} else {
|
||||||
// we are already in the same block, just rewind to seek again.
|
// we are already in the same block, just rewind to seek again.
|
||||||
block.rewind();
|
if (rewind) {
|
||||||
|
block.rewind();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
//Go back by (size of rowlength + size of valuelength) = 8 bytes
|
||||||
|
block.position(block.position()-8);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.SortedSet;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
|
||||||
|
@ -47,12 +46,37 @@ public interface HFileScanner {
|
||||||
* @return -1, if key < k[0], no position;
|
* @return -1, if key < k[0], no position;
|
||||||
* 0, such that k[i] = key and scanner is left in position i; and
|
* 0, such that k[i] = key and scanner is left in position i; and
|
||||||
* 1, such that k[i] < key, and scanner is left in position i.
|
* 1, such that k[i] < key, and scanner is left in position i.
|
||||||
* Furthermore, there may be a k[i+1], such that k[i] < key < k[i+1]
|
* The scanner will position itself between k[i] and k[i+1] where
|
||||||
* but there may not be a k[i+1], and next() will return false (EOF).
|
* k[i] < key <= k[i+1].
|
||||||
|
* If there is no key k[i+1] greater than or equal to the input key, then the
|
||||||
|
* scanner will position itself at the end of the file and next() will return
|
||||||
|
* false when it is called.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public int seekTo(byte[] key) throws IOException;
|
public int seekTo(byte[] key) throws IOException;
|
||||||
public int seekTo(byte[] key, int offset, int length) throws IOException;
|
public int seekTo(byte[] key, int offset, int length) throws IOException;
|
||||||
|
/**
|
||||||
|
* Reseek to or just before the passed <code>key</code>. Similar to seekTo
|
||||||
|
* except that this can be called even if the scanner is not at the beginning
|
||||||
|
* of a file.
|
||||||
|
* This can be used to seek only to keys which come after the current position
|
||||||
|
* of the scanner.
|
||||||
|
* Consider the key stream of all the keys in the file,
|
||||||
|
* <code>k[0] .. k[n]</code>, where there are n keys in the file after
|
||||||
|
* current position of HFileScanner.
|
||||||
|
* The scanner will position itself between k[i] and k[i+1] where
|
||||||
|
* k[i] < key <= k[i+1].
|
||||||
|
* If there is no key k[i+1] greater than or equal to the input key, then the
|
||||||
|
* scanner will position itself at the end of the file and next() will return
|
||||||
|
* false when it is called.
|
||||||
|
* @param key Key to find (should be non-null)
|
||||||
|
* @return -1, if key < k[0], no position;
|
||||||
|
* 0, such that k[i] = key and scanner is left in position i; and
|
||||||
|
* 1, such that k[i] < key, and scanner is left in position i.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public int reseekTo(byte[] key) throws IOException;
|
||||||
|
public int reseekTo(byte[] key, int offset, int length) throws IOException;
|
||||||
/**
|
/**
|
||||||
* Consider the key stream of all the keys in the file,
|
* Consider the key stream of all the keys in the file,
|
||||||
* <code>k[0] .. k[n]</code>, where there are n keys in the file.
|
* <code>k[0] .. k[n]</code>, where there are n keys in the file.
|
||||||
|
|
|
@ -220,6 +220,33 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean reseek(KeyValue seekKey) throws IOException {
|
||||||
|
//This function is very identical to the seek(KeyValue) function except that
|
||||||
|
//scanner.seek(seekKey) is changed to scanner.reseek(seekKey)
|
||||||
|
if (this.current == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
this.heap.add(this.current);
|
||||||
|
this.current = null;
|
||||||
|
|
||||||
|
KeyValueScanner scanner;
|
||||||
|
while ((scanner = this.heap.poll()) != null) {
|
||||||
|
KeyValue topKey = scanner.peek();
|
||||||
|
if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
|
||||||
|
// Top KeyValue is at-or-after Seek KeyValue
|
||||||
|
this.current = scanner;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!scanner.reseek(seekKey)) {
|
||||||
|
scanner.close();
|
||||||
|
} else {
|
||||||
|
this.heap.add(scanner);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Heap is returning empty, scanner is done
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the current Heap
|
* @return the current Heap
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -46,6 +46,16 @@ public interface KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
public boolean seek(KeyValue key) throws IOException;
|
public boolean seek(KeyValue key) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reseek the scanner at or after the specified KeyValue.
|
||||||
|
* This method is guaranteed to seek to or before the required key only if the
|
||||||
|
* key comes after the current position of the scanner. Should not be used
|
||||||
|
* to seek to a key which may come before the current position.
|
||||||
|
* @param key seek value (should be non-null)
|
||||||
|
* @return true if scanner has values left, false if end of scanner
|
||||||
|
*/
|
||||||
|
public boolean reseek(KeyValue key) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close the KeyValue scanner.
|
* Close the KeyValue scanner.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -20,7 +20,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.lang.management.RuntimeMXBean;
|
import java.lang.management.RuntimeMXBean;
|
||||||
import java.rmi.UnexpectedException;
|
import java.rmi.UnexpectedException;
|
||||||
|
@ -601,6 +600,20 @@ public class MemStore implements HeapSize {
|
||||||
return lowest != null;
|
return lowest != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean reseek(KeyValue key) {
|
||||||
|
while (kvsetNextRow != null &&
|
||||||
|
comparator.compare(kvsetNextRow, key) < 0) {
|
||||||
|
kvsetNextRow = getNext(kvsetIt);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (snapshotNextRow != null &&
|
||||||
|
comparator.compare(snapshotNextRow, key) < 0) {
|
||||||
|
snapshotNextRow = getNext(snapshotIt);
|
||||||
|
}
|
||||||
|
return (kvsetNextRow != null || snapshotNextRow != null);
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized KeyValue peek() {
|
public synchronized KeyValue peek() {
|
||||||
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
|
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
|
||||||
return getLowest();
|
return getLowest();
|
||||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -73,6 +72,10 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
|
||||||
throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner");
|
throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean reseek(KeyValue key) {
|
||||||
|
return seek(key);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* High performance merge scan.
|
* High performance merge scan.
|
||||||
* @param writer
|
* @param writer
|
||||||
|
|
|
@ -125,7 +125,7 @@ public class ScanQueryMatcher {
|
||||||
// could optimize this, if necessary?
|
// could optimize this, if necessary?
|
||||||
// Could also be called SEEK_TO_CURRENT_ROW, but this
|
// Could also be called SEEK_TO_CURRENT_ROW, but this
|
||||||
// should be rare/never happens.
|
// should be rare/never happens.
|
||||||
return MatchCode.SKIP;
|
return MatchCode.SEEK_NEXT_ROW;
|
||||||
}
|
}
|
||||||
|
|
||||||
// optimize case.
|
// optimize case.
|
||||||
|
@ -150,7 +150,7 @@ public class ScanQueryMatcher {
|
||||||
long timestamp = kv.getTimestamp();
|
long timestamp = kv.getTimestamp();
|
||||||
if (isExpired(timestamp)) {
|
if (isExpired(timestamp)) {
|
||||||
// done, the rest of this column will also be expired as well.
|
// done, the rest of this column will also be expired as well.
|
||||||
return MatchCode.SEEK_NEXT_COL;
|
return getNextRowOrNextColumn(bytes, offset, qualLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
byte type = kv.getType();
|
byte type = kv.getType();
|
||||||
|
@ -194,6 +194,8 @@ public class ScanQueryMatcher {
|
||||||
} else if (filterResponse == ReturnCode.NEXT_ROW) {
|
} else if (filterResponse == ReturnCode.NEXT_ROW) {
|
||||||
stickyNextRow = true;
|
stickyNextRow = true;
|
||||||
return MatchCode.SEEK_NEXT_ROW;
|
return MatchCode.SEEK_NEXT_ROW;
|
||||||
|
} else if (filterResponse == ReturnCode.SEEK_NEXT_USING_HINT) {
|
||||||
|
return MatchCode.SEEK_NEXT_USING_HINT;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,6 +274,14 @@ public class ScanQueryMatcher {
|
||||||
return this.startKey;
|
return this.startKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public KeyValue getNextKeyHint(KeyValue kv) {
|
||||||
|
if (filter == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return filter.getNextKeyHint(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link #match} return codes. These instruct the scanner moving through
|
* {@link #match} return codes. These instruct the scanner moving through
|
||||||
* memstores and StoreFiles what to do with the current KeyValue.
|
* memstores and StoreFiles what to do with the current KeyValue.
|
||||||
|
@ -317,5 +327,10 @@ public class ScanQueryMatcher {
|
||||||
* Done with scan, thanks to the row filter.
|
* Done with scan, thanks to the row filter.
|
||||||
*/
|
*/
|
||||||
DONE_SCAN,
|
DONE_SCAN,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Seek to next key which is given as hint.
|
||||||
|
*/
|
||||||
|
SEEK_NEXT_USING_HINT,
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -105,6 +105,20 @@ class StoreFileScanner implements KeyValueScanner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean reseek(KeyValue key) throws IOException {
|
||||||
|
try {
|
||||||
|
if (!reseekAtOrAfter(hfs, key)) {
|
||||||
|
close();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
cur = hfs.getKeyValue();
|
||||||
|
hfs.next();
|
||||||
|
return true;
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new IOException("Could not seek " + this, ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
// Nothing to close on HFileScanner?
|
// Nothing to close on HFileScanner?
|
||||||
cur = null;
|
cur = null;
|
||||||
|
@ -132,6 +146,19 @@ class StoreFileScanner implements KeyValueScanner {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static boolean reseekAtOrAfter(HFileScanner s, KeyValue k)
|
||||||
|
throws IOException {
|
||||||
|
//This function is similar to seekAtOrAfter function
|
||||||
|
int result = s.reseekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
|
||||||
|
if (result <= 0) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
// passed KV is larger than current KV in file, if there is a next
|
||||||
|
// it is after, if not then this scanner is done.
|
||||||
|
return s.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// StoreFile filter hook.
|
// StoreFile filter hook.
|
||||||
public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
|
public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
|
||||||
return reader.shouldSeek(scan, columns);
|
return reader.shouldSeek(scan, columns);
|
||||||
|
|
|
@ -279,6 +279,15 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
||||||
this.heap.next();
|
this.heap.next();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case SEEK_NEXT_USING_HINT:
|
||||||
|
KeyValue nextKV = matcher.getNextKeyHint(kv);
|
||||||
|
if (nextKV != null) {
|
||||||
|
reseek(nextKV);
|
||||||
|
} else {
|
||||||
|
heap.next();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("UNEXPECTED");
|
throw new RuntimeException("UNEXPECTED");
|
||||||
}
|
}
|
||||||
|
@ -324,18 +333,20 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
||||||
|
|
||||||
private void checkReseek() throws IOException {
|
private void checkReseek() throws IOException {
|
||||||
if (this.heap == null && this.lastTop != null) {
|
if (this.heap == null && this.lastTop != null) {
|
||||||
|
resetScannerStack(this.lastTop);
|
||||||
reseek(this.lastTop);
|
|
||||||
this.lastTop = null; // gone!
|
this.lastTop = null; // gone!
|
||||||
}
|
}
|
||||||
// else dont need to reseek
|
// else dont need to reseek
|
||||||
}
|
}
|
||||||
|
|
||||||
private void reseek(KeyValue lastTopKey) throws IOException {
|
private void resetScannerStack(KeyValue lastTopKey) throws IOException {
|
||||||
if (heap != null) {
|
if (heap != null) {
|
||||||
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
|
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* When we have the scan object, should we not pass it to getScanners()
|
||||||
|
* to get a limited set of scanners? We did so in the constructor and we
|
||||||
|
* could have done it now by storing the scan object from the constructor */
|
||||||
List<KeyValueScanner> scanners = getScanners();
|
List<KeyValueScanner> scanners = getScanners();
|
||||||
|
|
||||||
for(KeyValueScanner scanner : scanners) {
|
for(KeyValueScanner scanner : scanners) {
|
||||||
|
@ -350,4 +361,11 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
||||||
KeyValue kv = heap.peek();
|
KeyValue kv = heap.peek();
|
||||||
matcher.setRow((kv == null ? lastTopKey : kv).getRow());
|
matcher.setRow((kv == null ? lastTopKey : kv).getRow());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean reseek(KeyValue kv) throws IOException {
|
||||||
|
//Heap cannot be null, because this is only called from next() which
|
||||||
|
//guarantees that heap will never be null before this call.
|
||||||
|
return this.heap.reseek(kv);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
package org.apache.hadoop.hbase.filter;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestColumnPrefixFilter {
|
||||||
|
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new
|
||||||
|
HBaseTestingUtility();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testColumnPrefixFilter() throws IOException {
|
||||||
|
String family = "Family";
|
||||||
|
HTableDescriptor htd = new HTableDescriptor("TestColumnPrefixFilter");
|
||||||
|
htd.addFamily(new HColumnDescriptor(family));
|
||||||
|
HRegionInfo info = new HRegionInfo(htd, null, null, false);
|
||||||
|
HRegion region = HRegion.createHRegion(info, HBaseTestingUtility.
|
||||||
|
getTestDir(), TEST_UTIL.getConfiguration());
|
||||||
|
|
||||||
|
List<String> rows = generateRandomWords(100, "row");
|
||||||
|
List<String> columns = generateRandomWords(10000, "column");
|
||||||
|
long maxTimestamp = 2;
|
||||||
|
|
||||||
|
List<KeyValue> kvList = new ArrayList<KeyValue>();
|
||||||
|
|
||||||
|
Map<String, List<KeyValue>> prefixMap = new HashMap<String,
|
||||||
|
List<KeyValue>>();
|
||||||
|
|
||||||
|
prefixMap.put("p", new ArrayList<KeyValue>());
|
||||||
|
prefixMap.put("s", new ArrayList<KeyValue>());
|
||||||
|
|
||||||
|
String valueString = "ValueString";
|
||||||
|
|
||||||
|
for (String row: rows) {
|
||||||
|
Put p = new Put(Bytes.toBytes(row));
|
||||||
|
for (String column: columns) {
|
||||||
|
for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) {
|
||||||
|
KeyValue kv = KeyValueTestUtil.create(row, family, column, timestamp,
|
||||||
|
valueString);
|
||||||
|
p.add(kv);
|
||||||
|
kvList.add(kv);
|
||||||
|
for (String s: prefixMap.keySet()) {
|
||||||
|
if (column.startsWith(s)) {
|
||||||
|
prefixMap.get(s).add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
ColumnPrefixFilter filter;
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setMaxVersions();
|
||||||
|
for (String s: prefixMap.keySet()) {
|
||||||
|
filter = new ColumnPrefixFilter(Bytes.toBytes(s));
|
||||||
|
scan.setFilter(filter);
|
||||||
|
InternalScanner scanner = region.getScanner(scan);
|
||||||
|
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||||
|
while(scanner.next(results));
|
||||||
|
assertEquals(prefixMap.get(s).size(), results.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> generateRandomWords(int numberOfWords, String suffix) {
|
||||||
|
Set<String> wordSet = new HashSet<String>();
|
||||||
|
for (int i = 0; i < numberOfWords; i++) {
|
||||||
|
int lengthOfWords = (int) (Math.random()*2) + 1;
|
||||||
|
char[] wordChar = new char[lengthOfWords];
|
||||||
|
for (int j = 0; j < wordChar.length; j++) {
|
||||||
|
wordChar[j] = (char) (Math.random() * 26 + 97);
|
||||||
|
}
|
||||||
|
String word;
|
||||||
|
if (suffix == null) {
|
||||||
|
word = new String(wordChar);
|
||||||
|
} else {
|
||||||
|
word = new String(wordChar) + suffix;
|
||||||
|
}
|
||||||
|
wordSet.add(word);
|
||||||
|
}
|
||||||
|
List<String> wordList = new ArrayList<String>(wordSet);
|
||||||
|
return wordList;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link HFileScanner#reseekTo(byte[])}
|
||||||
|
*/
|
||||||
|
public class TestReseekTo {
|
||||||
|
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReseekTo() throws Exception {
|
||||||
|
|
||||||
|
Path ncTFile = new Path(HBaseTestingUtility.getTestDir(), "basic.hfile");
|
||||||
|
FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile);
|
||||||
|
HFile.Writer writer = new HFile.Writer(fout, 4000, "none", null);
|
||||||
|
int numberOfKeys = 1000;
|
||||||
|
|
||||||
|
String valueString = "Value";
|
||||||
|
|
||||||
|
List<Integer> keyList = new ArrayList<Integer>();
|
||||||
|
List<String> valueList = new ArrayList<String>();
|
||||||
|
|
||||||
|
for (int key = 0; key < numberOfKeys; key++) {
|
||||||
|
String value = valueString + key;
|
||||||
|
keyList.add(key);
|
||||||
|
valueList.add(value);
|
||||||
|
writer.append(Bytes.toBytes(key), Bytes.toBytes(value));
|
||||||
|
}
|
||||||
|
writer.close();
|
||||||
|
fout.close();
|
||||||
|
|
||||||
|
HFile.Reader reader = new HFile.Reader(TEST_UTIL.getTestFileSystem(),
|
||||||
|
ncTFile, null, false);
|
||||||
|
reader.loadFileInfo();
|
||||||
|
HFileScanner scanner = reader.getScanner(false, true);
|
||||||
|
|
||||||
|
scanner.seekTo();
|
||||||
|
for (int i = 0; i < keyList.size(); i++) {
|
||||||
|
Integer key = keyList.get(i);
|
||||||
|
String value = valueList.get(i);
|
||||||
|
long start = System.nanoTime();
|
||||||
|
scanner.seekTo(Bytes.toBytes(key));
|
||||||
|
System.out.println("Seek Finished in: " + (System.nanoTime() - start)/1000 + " µs");
|
||||||
|
assertEquals(value, scanner.getValueString());
|
||||||
|
}
|
||||||
|
|
||||||
|
scanner.seekTo();
|
||||||
|
for (int i = 0; i < keyList.size(); i += 10) {
|
||||||
|
Integer key = keyList.get(i);
|
||||||
|
String value = valueList.get(i);
|
||||||
|
long start = System.nanoTime();
|
||||||
|
scanner.reseekTo(Bytes.toBytes(key));
|
||||||
|
System.out.println("Reseek Finished in: " + (System.nanoTime() - start)/1000 + " µs");
|
||||||
|
assertEquals(value, scanner.getValueString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -94,6 +94,11 @@ public class KeyValueScanFixture implements KeyValueScanner {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean reseek(KeyValue key) {
|
||||||
|
return seek(key);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
// noop.
|
// noop.
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
@ -255,6 +254,11 @@ public class TestKeyValueHeap extends HBaseTestCase {
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean reseek(KeyValue key) throws IOException {
|
||||||
|
return seek(key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,7 +185,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
||||||
ScanQueryMatcher.MatchCode.INCLUDE,
|
ScanQueryMatcher.MatchCode.INCLUDE,
|
||||||
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
|
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
|
||||||
ScanQueryMatcher.MatchCode.INCLUDE,
|
ScanQueryMatcher.MatchCode.INCLUDE,
|
||||||
ScanQueryMatcher.MatchCode.SEEK_NEXT_COL,
|
ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW,
|
||||||
ScanQueryMatcher.MatchCode.DONE
|
ScanQueryMatcher.MatchCode.DONE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue