HBASE-2794 Utilize ROWCOL bloom filter if multiple columns within same family

are requested in a Get (Mikhail Bautin)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1177768 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-09-30 18:35:27 +00:00
parent e960eab790
commit 1d621be994
17 changed files with 562 additions and 81 deletions

View File

@ -566,6 +566,8 @@ Release 0.92.0 - Unreleased
(Jesse Yates)
HBASE-4499 [replication] Source shouldn't update ZK if it didn't progress
(Chris Trezzo via JD)
HBASE-2794 Utilize ROWCOL bloom filter if multiple columns within same family
are requested in a Get (Mikhail Bautin)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -1739,6 +1739,23 @@ public class KeyValue implements Writable, HeapSize {
HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
}
/**
* Similar to {@link #createLastOnRow(byte[], int, int, byte[], int, int,
* byte[], int, int)} but creates the last key on the row/column of this KV
* (the value part of the returned KV is always empty). Used in creating
* "fake keys" for the multi-column Bloom filter optimization to skip the
* row/column we already know is not in the file.
* @param kv the key-value pair to take row and column from
* @return the last key on the row/column of the given key-value pair
*/
public KeyValue createLastOnRowCol() {
return new KeyValue(
bytes, getRowOffset(), getRowLength(),
bytes, getFamilyOffset(), getFamilyLength(),
bytes, getQualifierOffset(), getQualifierLength(),
HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
}
/**
* @param b
* @return A KeyValue made of a byte array that holds the key-only part.

View File

@ -351,4 +351,8 @@ public abstract class AbstractHFileReader implements HFile.Reader {
return fsBlockReader;
}
public Path getPath() {
return path;
}
}

View File

@ -318,6 +318,8 @@ public class HFile {
* version. Knows nothing about how that metadata is structured.
*/
DataInput getBloomFilterMetadata() throws IOException;
Path getPath();
}
private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,

View File

@ -0,0 +1,33 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.KeyValue;
public abstract class AbstractKeyValueScanner implements KeyValueScanner {
@Override
public boolean seekExactly(KeyValue kv, boolean forward) throws IOException {
return forward ? reseek(kv) : seek(kv);
}
}

View File

@ -2816,6 +2816,10 @@ public class HRegion implements HeapSize { // , Writable{
}
this.filterClosed = true;
}
KeyValueHeap getStoreHeapForTesting() {
return storeHeap;
}
}
// Utility methods

View File

@ -45,6 +45,30 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
private KeyValueScanner current = null;
private KVScannerComparator comparator;
/**
* A helper enum that knows how to call the correct seek function within a
* {@link KeyValueScanner}.
*/
public enum SeekType {
NORMAL {
@Override
public boolean seek(KeyValueScanner scanner, KeyValue kv,
boolean forward) throws IOException {
return forward ? scanner.reseek(kv) : scanner.seek(kv);
}
},
EXACT {
@Override
public boolean seek(KeyValueScanner scanner, KeyValue kv,
boolean forward) throws IOException {
return scanner.seekExactly(kv, forward);
}
};
public abstract boolean seek(KeyValueScanner scanner, KeyValue kv,
boolean forward) throws IOException;
}
/**
* Constructor. This KeyValueHeap will handle closing of passed in
* KeyValueScanners.
@ -210,54 +234,53 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
* @return true if KeyValues exist at or after specified key, false if not
* @throws IOException
*/
@Override
public boolean seek(KeyValue seekKey) throws IOException {
if (this.current == null) {
return false;
}
this.heap.add(this.current);
this.current = null;
KeyValueScanner scanner;
while((scanner = this.heap.poll()) != null) {
KeyValue topKey = scanner.peek();
if(comparator.getComparator().compare(seekKey, topKey) <= 0) { // Correct?
// Top KeyValue is at-or-after Seek KeyValue
this.current = scanner;
return true;
}
if(!scanner.seek(seekKey)) {
scanner.close();
} else {
this.heap.add(scanner);
}
}
// Heap is returning empty, scanner is done
return false;
return generalizedSeek(seekKey, SeekType.NORMAL, false);
}
/**
* This function is identical to the {@link #seek(KeyValue)} function except
* that scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
*/
@Override
public boolean reseek(KeyValue seekKey) throws IOException {
//This function is very identical to the seek(KeyValue) function except that
//scanner.seek(seekKey) is changed to scanner.reseek(seekKey)
if (this.current == null) {
return generalizedSeek(seekKey, SeekType.NORMAL, true);
}
/**
* {@inheritDoc}
*/
@Override
public boolean seekExactly(KeyValue seekKey, boolean forward)
throws IOException {
return generalizedSeek(seekKey, SeekType.EXACT, forward);
}
private boolean generalizedSeek(KeyValue seekKey, SeekType seekType,
boolean forward) throws IOException {
if (current == null) {
return false;
}
this.heap.add(this.current);
this.current = null;
heap.add(current);
current = null;
KeyValueScanner scanner;
while ((scanner = this.heap.poll()) != null) {
while ((scanner = heap.poll()) != null) {
KeyValue topKey = scanner.peek();
if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
// Top KeyValue is at-or-after Seek KeyValue
this.current = scanner;
current = scanner;
return true;
}
if (!scanner.reseek(seekKey)) {
if (!seekType.seek(scanner, seekKey, forward)) {
scanner.close();
} else {
this.heap.add(scanner);
heap.add(scanner);
}
}
// Heap is returning empty, scanner is done
return false;
}
@ -273,4 +296,8 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
public long getSequenceID() {
return 0;
}
KeyValueScanner getCurrentForTesting() {
return current;
}
}

View File

@ -48,7 +48,7 @@ public interface KeyValueScanner {
/**
* Reseek the scanner at or after the specified KeyValue.
* This method is guaranteed to seek to or before the required key only if the
* This method is guaranteed to seek at or after the required key only if the
* key comes after the current position of the scanner. Should not be used
* to seek to a key which may come before the current position.
* @param key seek value (should be non-null)
@ -56,6 +56,16 @@ public interface KeyValueScanner {
*/
public boolean reseek(KeyValue key) throws IOException;
/**
* Similar to {@link #seek} (or {@link #reseek} if forward is true) but only
* does a seek operation after checking that it is really necessary for the
* row/column combination specified by the kv parameter. This function was
* added to avoid unnecessary disk seeks on multi-column get queries using
* Bloom filter checking. Should only be used for queries where the set of
* columns is specified exactly.
*/
public boolean seekExactly(KeyValue kv, boolean forward) throws IOException;
/**
* Get the sequence id associated with this KeyValueScanner. This is required
* for comparing multiple files to find out which one has the latest data.

View File

@ -632,7 +632,7 @@ public class MemStore implements HeapSize {
* map and snapshot.
* This behaves as if it were a real scanner but does not maintain position.
*/
protected class MemStoreScanner implements KeyValueScanner {
protected class MemStoreScanner extends AbstractKeyValueScanner {
// Next row information for either kvset or snapshot
private KeyValue kvsetNextRow = null;
private KeyValue snapshotNextRow = null;

View File

@ -58,6 +58,12 @@ public class ScanQueryMatcher {
/** Row the query is on */
protected byte [] row;
/**
* True if we are only interested in the given exact set of columns. In that
* case we can use Bloom filters to avoid unnecessary disk seeks.
*/
private boolean exactColumnQuery;
/**
* Constructs a ScanQueryMatcher for a Scan.
@ -88,8 +94,10 @@ public class ScanQueryMatcher {
// between rows, not between storefiles.
this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions,
ttl);
exactColumnQuery = true;
}
}
public ScanQueryMatcher(Scan scan, byte [] family,
NavigableSet<byte[]> columns, long ttl,
KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions) {
@ -302,6 +310,10 @@ public class ScanQueryMatcher {
null, 0, 0);
}
public boolean isExactColumnQuery() {
return exactColumnQuery;
}
/**
* {@link #match} return codes. These instruct the scanner moving through
* memstores and StoreFiles what to do with the current KeyValue.

View File

@ -1077,12 +1077,95 @@ public class StoreFile {
}
}
/**
* Checks whether the given scan passes the Bloom filter (if present). Only
* checks Bloom filters for single-row or single-row-column scans. Bloom
* filter checking for multi-gets is implemented as part of the store
* scanner system (see {@link StoreFileScanner#seekExactly}) and uses
* the lower-level API {@link #passesBloomFilter(byte[], int, int, byte[],
* int, int)}.
*
* @param scan the scan specification. Used to determine the row, and to
* check whether this is a single-row ("get") scan.
* @param columns the set of columns. Only used for row-column Bloom
* filters.
* @return true if the scan with the given column set passes the Bloom
* filter, or if the Bloom filter is not applicable for the scan.
* False if the Bloom filter is applicable and the scan fails it.
*/
private boolean passesBloomFilter(Scan scan,
final SortedSet<byte[]> columns) {
if (!scan.isGetScan())
// Multi-column non-get scans will use Bloom filters through the
// lower-level API function that this function calls.
if (!scan.isGetScan()) {
return true;
}
byte[] row = scan.getStartRow();
switch (this.bloomFilterType) {
case ROW:
return passesBloomFilter(row, 0, row.length, null, 0, 0);
case ROWCOL:
if (columns != null && columns.size() == 1) {
byte[] column = columns.first();
return passesBloomFilter(row, 0, row.length, column, 0,
column.length);
}
// For multi-column queries the Bloom filter is checked from the
// seekExact operation.
return true;
default:
return true;
}
}
/**
* A method for checking Bloom filters. Called directly from
* {@link StoreFileScanner} in case of a multi-column query.
*
* @param row
* @param rowOffset
* @param rowLen
* @param col
* @param colOffset
* @param colLen
* @return
*/
public boolean passesBloomFilter(byte[] row, int rowOffset, int rowLen,
byte[] col, int colOffset, int colLen) {
if (bloomFilter == null)
return true;
byte[] key;
switch (bloomFilterType) {
case ROW:
if (col != null) {
throw new RuntimeException("Row-only Bloom filter called with " +
"column specified");
}
if (rowOffset != 0 || rowLen != row.length) {
throw new AssertionError("For row-only Bloom filters the row "
+ "must occupy the whole array");
}
key = row;
break;
case ROWCOL:
key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
colOffset, colLen);
break;
default:
return true;
}
// Cache Bloom filter as a local variable in case it is set to null by
// another thread on an IO error.
BloomFilter bloomFilter = this.bloomFilter;
if (bloomFilter == null) {
return true;
}
@ -1091,26 +1174,6 @@ public class StoreFile {
if (reader.getTrailer().getEntryCount() == 0)
return false;
byte[] row = scan.getStartRow();
byte[] key;
switch (this.bloomFilterType) {
case ROW:
key = row;
break;
case ROWCOL:
if (columns != null && columns.size() == 1) {
byte[] column = columns.first();
key = bloomFilter.createBloomKey(row, 0, row.length,
column, 0, column.length);
break;
}
return true;
default:
return true;
}
try {
boolean shouldCheckBloom;
ByteBuffer bloom;
@ -1286,6 +1349,10 @@ public class StoreFile {
HFile.Reader getHFileReader() {
return reader;
}
void disableBloomFilterForTesting() {
bloomFilter = null;
}
}
/**

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import java.io.IOException;
import java.util.ArrayList;
@ -167,4 +168,32 @@ class StoreFileScanner implements KeyValueScanner {
public long getSequenceID() {
return reader.getSequenceID();
}
@Override
public boolean seekExactly(KeyValue kv, boolean forward)
throws IOException {
if (reader.getBloomFilterType() != StoreFile.BloomType.ROWCOL ||
kv.getRowLength() == 0 || kv.getQualifierLength() == 0) {
return forward ? reseek(kv) : seek(kv);
}
boolean isInBloom = reader.passesBloomFilter(kv.getBuffer(),
kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
kv.getQualifierOffset(), kv.getQualifierLength());
if (isInBloom) {
// This row/column might be in this store file. Do a normal seek.
return forward ? reseek(kv) : seek(kv);
}
// Create a fake key/value, so that this scanner only bubbles up to the top
// of the KeyValueHeap in StoreScanner after we scanned this row/column in
// all other store files. The query matcher will then just skip this fake
// key/value and the store scanner will progress to the next column.
cur = kv.createLastOnRowCol();
return true;
}
Reader getReaderForTesting() {
return reader;
}
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
@ -71,10 +72,14 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
// pass columns = try to filter out unnecessary ScanFiles
List<KeyValueScanner> scanners = getScanners(scan, columns);
// Seek all scanners to the start of the Row (or if the exact maching row key does not
// exist, then to the start of the next matching Row).
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
// Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the next matching Row).
if (matcher.isExactColumnQuery()) {
for (KeyValueScanner scanner : scanners)
scanner.seekExactly(matcher.getStartKey(), false);
} else {
for (KeyValueScanner scanner : scanners)
scanner.seek(matcher.getStartKey());
}
// Combine all seeked scanners with a heap
@ -406,11 +411,31 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
public synchronized boolean reseek(KeyValue kv) throws IOException {
//Heap cannot be null, because this is only called from next() which
//guarantees that heap will never be null before this call.
return this.heap.reseek(kv);
return matcher.isExactColumnQuery() ? heap.seekExactly(kv, true) :
heap.reseek(kv);
}
@Override
public long getSequenceID() {
return 0;
}
@Override
public boolean seekExactly(KeyValue kv, boolean forward) throws IOException {
throw new NotImplementedException();
}
/**
* Used in testing.
* @return all scanners in no particular order
*/
List<KeyValueScanner> getAllScannersForTesting() {
List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
KeyValueScanner current = heap.getCurrentForTesting();
if (current != null)
allScanners.add(current);
for (KeyValueScanner scanner : heap.getHeap())
allScanners.add(scanner);
return allScanners;
}
}

View File

@ -26,13 +26,14 @@ import java.util.List;
import java.util.SortedSet;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.AbstractKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
/**
* Utility scanner that wraps a sortable collection and serves
* as a KeyValueScanner.
*/
public class CollectionBackedScanner implements KeyValueScanner {
public class CollectionBackedScanner extends AbstractKeyValueScanner {
final private Iterable<KeyValue> data;
final KeyValue.KVComparator comparator;
private Iterator<KeyValue> iter;

View File

@ -210,6 +210,8 @@ public class TestKeyValueHeap extends HBaseTestCase {
}
private static class Scanner extends CollectionBackedScanner {
private Iterator<KeyValue> iter;
private KeyValue current;
private boolean closed = false;
public Scanner(List<KeyValue> list) {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -59,9 +60,10 @@ public class TestMultiColumnScanner {
private static final Log LOG = LogFactory.getLog(TestMultiColumnScanner.class);
private static final String FAMILY = "CF";
private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
private static final int MAX_VERSIONS = 50;
private static final String TABLE_NAME = "TestMultiColumnScanner";
static final String FAMILY = "CF";
static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
static final int MAX_VERSIONS = 50;
/**
* The size of the column qualifier set used. Increasing this parameter
@ -84,7 +86,13 @@ public class TestMultiColumnScanner {
Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 };
/** The probability that a column is skipped in a store file. */
private static final double COLUMN_SKIP_PROBABILITY = 0.7;
private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7;
/** The probability of skipping a column in a single row */
private static final double COLUMN_SKIP_IN_ROW_PROB = 0.1;
/** The probability of skipping a column everywhere */
private static final double COLUMN_SKIP_EVERYWHERE_PROB = 0.1;
/** The probability to delete a row/column pair */
private static final double DELETE_PROBABILITY = 0.02;
@ -122,16 +130,7 @@ public class TestMultiColumnScanner {
@Test
public void testMultiColumnScanner() throws IOException {
String table = "TestMultiColumnScanner";
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY_BYTES, MAX_VERSIONS,
comprAlgo.getName(), HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE, HColumnDescriptor.DEFAULT_TTL,
bloomType.toString());
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(hcd);
HRegionInfo info = new HRegionInfo(Bytes.toBytes(table), null, null, false);
HRegion region = HRegion.createHRegion(info,
HBaseTestingUtility.getTestDir(), TEST_UTIL.getConfiguration(), htd);
HRegion region = createRegion(TABLE_NAME, comprAlgo, bloomType);
List<String> rows = sequentialStrings("row", NUM_ROWS);
List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
List<KeyValue> kvs = new ArrayList<KeyValue>();
@ -142,11 +141,30 @@ public class TestMultiColumnScanner {
Map<String, Long> lastDelTimeMap = new HashMap<String, Long>();
Random rand = new Random(29372937L);
Set<String> rowQualSkip = new HashSet<String>();
// Skip some columns in some rows. We need to test scanning over a set
// of columns when some of the columns are not there.
for (String row : rows)
for (String qual : qualifiers)
if (rand.nextDouble() < COLUMN_SKIP_IN_ROW_PROB) {
LOG.info("Skipping " + qual + " in row " + row);
rowQualSkip.add(rowQualKey(row, qual));
}
// Also skip some columns in all rows.
for (String qual : qualifiers)
if (rand.nextDouble() < COLUMN_SKIP_EVERYWHERE_PROB) {
LOG.info("Skipping " + qual + " in all rows");
for (String row : rows)
rowQualSkip.add(rowQualKey(row, qual));
}
for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) {
for (String qual : qualifiers) {
// This is where we decide to include or not include this column into
// this store file, regardless of row and timestamp.
if (rand.nextDouble() < COLUMN_SKIP_PROBABILITY)
if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB)
continue;
byte[] qualBytes = Bytes.toBytes(qual);
@ -154,7 +172,8 @@ public class TestMultiColumnScanner {
Put p = new Put(Bytes.toBytes(row));
for (long ts : TIMESTAMPS) {
String value = createValue(row, qual, ts);
KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts, value);
KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts,
value);
assertEquals(kv.getTimestamp(), ts);
p.add(kv);
String keyAsString = kv.toString();
@ -241,10 +260,31 @@ public class TestMultiColumnScanner {
}
}
}
assertTrue("This test is supposed to delete at least some row/column "
+ "pairs", lastDelTimeMap.size() > 0);
LOG.info("Number of row/col pairs deleted at least once: "
+ lastDelTimeMap.size());
assertTrue("This test is supposed to delete at least some row/column " +
"pairs", lastDelTimeMap.size() > 0);
LOG.info("Number of row/col pairs deleted at least once: " +
lastDelTimeMap.size());
region.close();
}
static HRegion createRegion(String tableName,
Compression.Algorithm comprAlgo, BloomType bloomType)
throws IOException {
HColumnDescriptor hcd =
new HColumnDescriptor(FAMILY_BYTES, MAX_VERSIONS,
comprAlgo.getName(),
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
HColumnDescriptor.DEFAULT_TTL,
bloomType.toString());
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(hcd);
HRegionInfo info =
new HRegionInfo(Bytes.toBytes(tableName), null, null, false);
HRegion region = HRegion.createHRegion(
info, HBaseTestingUtility.getTestDir(), TEST_UTIL.getConfiguration(),
htd);
return region;
}
private static String getRowQualStr(KeyValue kv) {
@ -269,7 +309,11 @@ public class TestMultiColumnScanner {
kv.getQualifierLength());
}
private static String createValue(String row, String qual, long ts) {
private static String rowQualKey(String row, String qual) {
return row + "_" + qual;
}
static String createValue(String row, String qual, long ts) {
return "value_for_" + row + "_" + qual + "_" + ts;
}

View File

@ -0,0 +1,202 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import static org.apache.hadoop.hbase.regionserver.TestMultiColumnScanner.*;
import static org.junit.Assert.*;
/**
* Test a multi-column scanner when there is a Bloom filter false-positive.
* This is needed for the multi-column Bloom filter optimization.
*/
@RunWith(Parameterized.class)
public class TestScanWithBloomError {
private static final Log LOG =
LogFactory.getLog(TestScanWithBloomError.class);
private static final String TABLE_NAME = "ScanWithBloomError";
private static final String ROW = "theRow";
private static final String QUALIFIER_PREFIX = "qual";
private static final byte[] ROW_BYTES = Bytes.toBytes(ROW);
private static NavigableSet<Integer> allColIds = new TreeSet<Integer>();
private HRegion region;
private StoreFile.BloomType bloomType;
private FileSystem fs;
private Configuration conf;
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
@Parameters
public static final Collection<Object[]> parameters() {
List<Object[]> configurations = new ArrayList<Object[]>();
for (StoreFile.BloomType bloomType : StoreFile.BloomType.values()) {
configurations.add(new Object[] { bloomType });
}
return configurations;
}
public TestScanWithBloomError(StoreFile.BloomType bloomType) {
this.bloomType = bloomType;
}
@Before
public void setUp() throws IOException{
conf = TEST_UTIL.getConfiguration();
fs = FileSystem.get(conf);
}
@Test
public void testThreeStoreFiles() throws IOException {
region = createRegion(TABLE_NAME, Compression.Algorithm.GZ, bloomType);
createStoreFile(new int[] {1, 2, 6});
createStoreFile(new int[] {1, 2, 3, 7});
createStoreFile(new int[] {1, 9});
scanColSet(new int[]{1, 4, 6, 7}, new int[]{1, 6, 7});
region.close();
}
private void scanColSet(int[] colSet, int[] expectedResultCols)
throws IOException {
LOG.info("Scanning column set: " + Arrays.toString(colSet));
Scan scan = new Scan(ROW_BYTES, ROW_BYTES);
addColumnSetToScan(scan, colSet);
RegionScannerImpl scanner = (RegionScannerImpl) region.getScanner(scan);
KeyValueHeap storeHeap = scanner.getStoreHeapForTesting();
assertEquals(0, storeHeap.getHeap().size());
StoreScanner storeScanner =
(StoreScanner) storeHeap.getCurrentForTesting();
@SuppressWarnings({ "unchecked", "rawtypes" })
List<StoreFileScanner> scanners = (List<StoreFileScanner>)
(List) storeScanner.getAllScannersForTesting();
// Sort scanners by their HFile's modification time.
Collections.sort(scanners, new Comparator<StoreFileScanner>() {
@Override
public int compare(StoreFileScanner s1, StoreFileScanner s2) {
Path p1 = s1.getReaderForTesting().getHFileReader().getPath();
Path p2 = s2.getReaderForTesting().getHFileReader().getPath();
long t1, t2;
try {
t1 = fs.getFileStatus(p1).getModificationTime();
t2 = fs.getFileStatus(p2).getModificationTime();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
return t1 < t2 ? -1 : t1 == t2 ? 1 : 0;
}
});
StoreFile.Reader lastStoreFileReader = null;
for (StoreFileScanner sfScanner : scanners)
lastStoreFileReader = sfScanner.getReaderForTesting();
new HFilePrettyPrinter().run(new String[]{ "-m", "-p", "-f",
lastStoreFileReader.getHFileReader().getPath().toString()});
// Disable Bloom filter for the last store file. The disabled Bloom filter
// will always return "true".
LOG.info("Disabling Bloom filter for: "
+ lastStoreFileReader.getHFileReader().getName());
lastStoreFileReader.disableBloomFilterForTesting();
List<KeyValue> allResults = new ArrayList<KeyValue>();
{ // Limit the scope of results.
List<KeyValue> results = new ArrayList<KeyValue>();
while (scanner.next(results) || results.size() > 0) {
allResults.addAll(results);
results.clear();
}
}
List<Integer> actualIds = new ArrayList<Integer>();
for (KeyValue kv : allResults) {
String qual = Bytes.toString(kv.getQualifier());
assertTrue(qual.startsWith(QUALIFIER_PREFIX));
actualIds.add(Integer.valueOf(qual.substring(
QUALIFIER_PREFIX.length())));
}
List<Integer> expectedIds = new ArrayList<Integer>();
for (int expectedId : expectedResultCols)
expectedIds.add(expectedId);
LOG.info("Column ids returned: " + actualIds + ", expected: "
+ expectedIds);
assertEquals(expectedIds.toString(), actualIds.toString());
}
private void addColumnSetToScan(Scan scan, int[] colIds) {
for (int colId : colIds)
scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qualFromId(colId)));
}
private String qualFromId(int colId) {
return QUALIFIER_PREFIX + colId;
}
private void createStoreFile(int[] colIds)
throws IOException {
Put p = new Put(ROW_BYTES);
for (int colId : colIds) {
long ts = Long.MAX_VALUE;
String qual = qualFromId(colId);
allColIds.add(colId);
KeyValue kv = KeyValueTestUtil.create(ROW, FAMILY,
qual, ts, createValue(ROW, qual, ts));
p.add(kv);
}
region.put(p);
region.flushcache();
}
}