HBASE-20636 Introduce two bloom filter type : ROWPREFIX and ROWPREFIX_DELIMITED

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Amending-Author: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Guangxu Cheng 2018-05-29 20:11:42 +08:00 committed by Andrew Purtell
parent 29214c77a6
commit fd68e7593e
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
20 changed files with 852 additions and 35 deletions

View File

@ -34,5 +34,13 @@ public enum BloomType {
/**
* Bloom enabled with Table row &amp; column (family+qualifier) as Key
*/
ROWCOL
ROWCOL,
/**
* Bloom enabled with Table row prefix as Key, specify the length of the prefix
*/
ROWPREFIX_FIXED_LENGTH,
/**
* Bloom enabled with Table row prefix as Key, specify the delimiter of the prefix
*/
ROWPREFIX_DELIMITED
}

View File

@ -22,6 +22,7 @@ import java.util.Random;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
/**
* Action that tries to adjust the bloom filter setting on all the columns of a
@ -53,6 +54,11 @@ public class ChangeBloomFilterAction extends Action {
LOG.debug("Performing action: About to set bloom filter type to "
+ bloomType + " on column " + columnName + " of table " + tableName);
columnBuilder.setBloomFilterType(bloomType);
if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
columnBuilder.setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "10");
} else if (bloomType == BloomType.ROWPREFIX_DELIMITED) {
columnBuilder.setConfiguration(BloomFilterUtil.DELIMITER_KEY, "#");
}
});
LOG.debug("Performing action: Just set bloom filter types on table " + tableName);

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
@ -148,6 +149,8 @@ public class HFileOutputFormat2
"hbase.hfileoutputformat.families.compression";
static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.bloomtype";
static final String BLOOM_PARAM_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.bloomparam";
static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.blocksize";
static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
@ -215,6 +218,7 @@ public class HFileOutputFormat2
// create a map from column family to the compression algorithm
final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
final Map<byte[], String> bloomParamMap = createFamilyBloomParamMap(conf);
final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
@ -383,6 +387,12 @@ public class HFileOutputFormat2
compression = compression == null ? defaultCompression : compression;
BloomType bloomType = bloomTypeMap.get(tableAndFamily);
bloomType = bloomType == null ? BloomType.NONE : bloomType;
String bloomParam = bloomParamMap.get(tableAndFamily);
if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam);
} else if (bloomType == BloomType.ROWPREFIX_DELIMITED) {
conf.set(BloomFilterUtil.DELIMITER_KEY, bloomParam);
}
Integer blockSize = blockSizeMap.get(tableAndFamily);
blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
DataBlockEncoding encoding = overriddenEncoding;
@ -648,6 +658,8 @@ public class HFileOutputFormat2
tableDescriptors));
conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
tableDescriptors));
conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails,
tableDescriptors));
conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
@ -675,6 +687,8 @@ public class HFileOutputFormat2
serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor));
conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
@ -722,6 +736,19 @@ public class HFileOutputFormat2
return bloomTypeMap;
}
/**
* Runs inside the task to deserialize column family to bloom filter param
* map from the configuration.
*
* @param conf to read the serialized values from
* @return a map from column family to the the configured bloom filter param
*/
@VisibleForTesting
static Map<byte[], String> createFamilyBloomParamMap(Configuration conf) {
return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY);
}
/**
* Runs inside the task to deserialize column family to block size
* map from the configuration.
@ -889,6 +916,30 @@ public class HFileOutputFormat2
return bloomType;
};
/**
* Serialize column family to bloom param map to configuration. Invoked while
* configuring the MR job for incremental load.
*
* @param tableDescriptor
* to read the properties from
* @param conf
* to persist serialized values into
*
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
static Function<ColumnFamilyDescriptor, String> bloomParamDetails = familyDescriptor -> {
BloomType bloomType = familyDescriptor.getBloomFilterType();
String bloomParam = "";
if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY);
} else if (bloomType == BloomType.ROWPREFIX_DELIMITED) {
bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.DELIMITER_KEY);
}
return bloomParam;
};
/**
* Serialize column family to data block encoding map to configuration.
* Invoked while configuring the MR job for incremental load.

View File

@ -119,6 +119,7 @@ public class LoadTestTool extends AbstractHBaseTool {
protected static final String OPT_VERBOSE = "verbose";
public static final String OPT_BLOOM = "bloom";
public static final String OPT_BLOOM_PARAM = "bloom_param";
public static final String OPT_COMPRESSION = "compression";
public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
@ -330,6 +331,7 @@ public class LoadTestTool extends AbstractHBaseTool {
addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type");
addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
@ -551,6 +553,22 @@ public class LoadTestTool extends AbstractHBaseTool {
bloomType = bloomStr == null ? BloomType.ROW :
BloomType.valueOf(bloomStr);
if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
} else {
conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
}
}
if (bloomType == BloomType.ROWPREFIX_DELIMITED) {
if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
} else {
conf.set(BloomFilterUtil.DELIMITER_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
}
}
inMemoryCF = cmd.hasOption(OPT_INMEMORY);
if (cmd.hasOption(OPT_ENCRYPTION)) {
cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));

View File

@ -174,12 +174,12 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
+ Bytes.toStringBinary(firstKeyInChunk));
}
// This will be done only once per chunk
if (bloomType == BloomType.ROW) {
firstKeyInChunk = CellUtil.copyRow(cell);
} else {
if (bloomType == BloomType.ROWCOL) {
firstKeyInChunk =
PrivateCellUtil
.getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell));
} else {
firstKeyInChunk = CellUtil.copyRow(cell);
}
allocateNewChunk();
}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -188,6 +189,7 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
@ -2139,6 +2141,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// check replication scope
checkReplicationScope(hcd);
// check bloom filter type
checkBloomFilterType(hcd);
// check data replication factor, it can be 0(default value) when user has not explicitly
// set the value, in this case we use default replication factor set in the file system.
@ -2221,6 +2225,16 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
private static void checkBloomFilterType(ColumnFamilyDescriptor cfd)
throws IOException {
Configuration conf = new CompoundConfiguration().addStringMap(cfd.getConfiguration());
try {
BloomFilterUtil.getBloomFilterParam(cfd.getBloomFilterType(), conf);
} catch (IllegalArgumentException e) {
throw new DoNotRetryIOException("Failed to get bloom filter param", e);
}
}
// HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
String message, Exception cause) throws IOException {

View File

@ -86,6 +86,9 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
/** Bloom filter Type in FileInfo */
public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
/** Bloom filter param in FileInfo */
public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM");
/** Delete Family Count in FileInfo */
public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.LAST_BLOOM_KEY;
@ -74,6 +75,8 @@ public class StoreFileReader {
private boolean bulkLoadResult = false;
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
private boolean skipResetSeqId = true;
private int prefixLength = -1;
private byte[] delimiter = null;
// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
@ -119,6 +122,8 @@ public class StoreFileReader {
this.bulkLoadResult = reader.bulkLoadResult;
this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV;
this.skipResetSeqId = reader.skipResetSeqId;
this.prefixLength = reader.prefixLength;
this.delimiter = reader.delimiter;
}
public boolean isPrimaryReplicaReader() {
@ -228,7 +233,7 @@ public class StoreFileReader {
/**
* Check if this storeFile may contain keys within the TimeRange that
* have not expired (i.e. not older than oldestUnexpiredTS).
* @param timeRange the timeRange to restrict
* @param tr the timeRange to restrict
* @param oldestUnexpiredTS the oldest timestamp that is not expired, as
* determined by the column family's TTL
* @return false if queried keys definitely don't exist in this StoreFile
@ -255,18 +260,18 @@ public class StoreFileReader {
* False if the Bloom filter is applicable and the scan fails it.
*/
boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
// Multi-column non-get scans will use Bloom filters through the
// lower-level API function that this function calls.
if (!scan.isGetScan()) {
return true;
}
byte[] row = scan.getStartRow();
switch (this.bloomFilterType) {
case ROW:
if (!scan.isGetScan()) {
return true;
}
return passesGeneralRowBloomFilter(row, 0, row.length);
case ROWCOL:
if (!scan.isGetScan()) {
return true;
}
if (columns != null && columns.size() == 1) {
byte[] column = columns.first();
// create the required fake key
@ -277,7 +282,10 @@ public class StoreFileReader {
// For multi-column queries the Bloom filter is checked from the
// seekExact operation.
return true;
case ROWPREFIX_FIXED_LENGTH:
return passesGeneralRowPrefixBloomFilter(scan);
case ROWPREFIX_DELIMITED:
return passesGeneralDelimitedRowPrefixBloomFilter(scan);
default:
return true;
}
@ -318,7 +326,7 @@ public class StoreFileReader {
*
* @return True if passes
*/
public boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) {
private boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) {
BloomFilter bloomFilter = this.generalBloomFilter;
if (bloomFilter == null) {
return true;
@ -328,7 +336,7 @@ public class StoreFileReader {
byte[] key = null;
if (rowOffset != 0 || rowLen != row.length) {
throw new AssertionError(
"For row-only Bloom filters the row " + "must occupy the whole array");
"For row-only Bloom filters the row must occupy the whole array");
}
key = row;
return checkGeneralBloomFilter(key, null, bloomFilter);
@ -358,6 +366,76 @@ public class StoreFileReader {
return checkGeneralBloomFilter(null, kvKey, bloomFilter);
}
/**
* A method for checking Bloom filters. Called directly from
* StoreFileScanner in case of a multi-column query.
*
* @return True if passes
*/
private boolean passesGeneralRowPrefixBloomFilter(Scan scan) {
BloomFilter bloomFilter = this.generalBloomFilter;
if (bloomFilter == null) {
return true;
}
byte[] row = scan.getStartRow();
byte[] rowPrefix;
if (scan.isGetScan()) {
rowPrefix = Bytes.copy(row, 0, Math.min(prefixLength, row.length));
} else {
// For non-get scans
// Find out the common prefix of startRow and stopRow.
int commonLength = Bytes.findCommonPrefix(scan.getStartRow(), scan.getStopRow(),
scan.getStartRow().length, scan.getStopRow().length, 0, 0);
// startRow and stopRow don't have the common prefix.
// Or the common prefix length is less than prefixLength
if (commonLength <= 0 || commonLength < prefixLength) {
return true;
}
rowPrefix = Bytes.copy(row, 0, prefixLength);
}
return checkGeneralBloomFilter(rowPrefix, null, bloomFilter);
}
/**
* A method for checking Bloom filters. Called directly from
* StoreFileScanner in case of a multi-column query.
*
* @return True if passes
*/
private boolean passesGeneralDelimitedRowPrefixBloomFilter(Scan scan) {
BloomFilter bloomFilter = this.generalBloomFilter;
if (bloomFilter == null) {
return true;
}
byte[] row = scan.getStartRow();
byte[] rowPrefix;
if (scan.isGetScan()) {
int rowPrefixLength = Bytes.indexOf(row, delimiter);
if (rowPrefixLength <= 0) {
rowPrefix = row;
} else {
rowPrefix = Bytes.copy(row, 0, rowPrefixLength);
}
} else {
// For non-get scans
// If startRow does not contain delimiter, return true directly.
int startRowPrefixLength = Bytes.indexOf(row, delimiter);
if (startRowPrefixLength <= 0) {
return true;
}
// If stopRow does not have the same prefix as startRow, return true directly.
int commonLength = Bytes.findCommonPrefix(scan.getStartRow(), scan.getStopRow(),
startRowPrefixLength, scan.getStopRow().length, 0, 0);
if (commonLength < startRowPrefixLength) {
return true;
}
rowPrefix = Bytes.copy(row, 0, startRowPrefixLength);
}
return checkGeneralBloomFilter(rowPrefix, null, bloomFilter);
}
private boolean checkGeneralBloomFilter(byte[] key, Cell kvKey, BloomFilter bloomFilter) {
// Empty file
if (reader.getTrailer().getEntryCount() == 0) {
@ -386,10 +464,10 @@ public class StoreFileReader {
// hbase:meta does not have blooms. So we need not have special interpretation
// of the hbase:meta cells. We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
if (keyIsAfterLast) {
if (bloomFilterType == BloomType.ROW) {
keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0);
} else {
if (bloomFilterType == BloomType.ROWCOL) {
keyIsAfterLast = (CellComparator.getInstance().compare(kvKey, lastBloomKeyOnlyKV)) > 0;
} else {
keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0);
}
}
@ -465,6 +543,13 @@ public class StoreFileReader {
bloomFilterType = BloomType.valueOf(Bytes.toString(b));
}
byte[] p = fi.get(BLOOM_FILTER_PARAM_KEY);
if (bloomFilterType == BloomType.ROWPREFIX_FIXED_LENGTH) {
prefixLength = Bytes.toInt(p);
} else if (bloomFilterType == BloomType.ROWPREFIX_DELIMITED) {
delimiter = p;
}
lastBloomKey = fi.get(LAST_BLOOM_KEY);
if(bloomFilterType == BloomType.ROWCOL) {
lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(lastBloomKey, 0, lastBloomKey.length);
@ -665,4 +750,13 @@ public class StoreFileReader {
public interface Listener {
void storeFileReaderClosed(StoreFileReader reader);
}
public int getPrefixLength() {
return prefixLength;
}
public byte[] getDelimiter() {
return delimiter;
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
@ -44,11 +45,14 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.util.BloomContext;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.RowBloomContext;
import org.apache.hadoop.hbase.util.RowColBloomContext;
import org.apache.hadoop.hbase.util.RowPrefixDelimiterBloomContext;
import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -65,6 +69,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
private final BloomFilterWriter generalBloomFilterWriter;
private final BloomFilterWriter deleteFamilyBloomFilterWriter;
private final BloomType bloomType;
private byte[] bloomParam = null;
private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
private long deleteFamilyCnt = 0;
private BloomContext bloomContext = null;
@ -110,9 +115,12 @@ public class StoreFileWriter implements CellSink, ShipperListener {
if (generalBloomFilterWriter != null) {
this.bloomType = bloomType;
this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
if (LOG.isTraceEnabled()) {
LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", " +
generalBloomFilterWriter.getClass().getSimpleName());
LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
+ (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH?
Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam))
+ ", " + generalBloomFilterWriter.getClass().getSimpleName());
}
// init bloom context
switch (bloomType) {
@ -122,9 +130,17 @@ public class StoreFileWriter implements CellSink, ShipperListener {
case ROWCOL:
bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator);
break;
case ROWPREFIX_FIXED_LENGTH:
bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, comparator,
Bytes.toInt(bloomParam));
break;
case ROWPREFIX_DELIMITED:
bloomContext = new RowPrefixDelimiterBloomContext(generalBloomFilterWriter, comparator,
bloomParam);
break;
default:
throw new IOException(
"Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL expected)");
throw new IOException("Invalid Bloom filter type: "
+ bloomType + " (ROW or ROWCOL or ROWPREFIX or ROWPREFIX_DELIMITED expected)");
}
} else {
// Not using Bloom filters.
@ -206,9 +222,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
* http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
* Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp
*
* 2 Types of Filtering:
* 4 Types of Filtering:
* 1. Row = Row
* 2. RowCol = Row + Qualifier
* 3. RowPrefixFixedLength = Fixed Length Row Prefix
* 4. RowPrefixDelimiter = Delimited Row Prefix
*/
bloomContext.writeBloom(cell);
}
@ -280,6 +298,9 @@ public class StoreFileWriter implements CellSink, ShipperListener {
if (hasGeneralBloom) {
writer.addGeneralBloomFilter(generalBloomFilterWriter);
writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
if (bloomParam != null) {
writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
}
bloomContext.addLastBloomKey(writer);
}
return hasGeneralBloom;

View File

@ -187,12 +187,12 @@ public class BloomFilterChunk implements BloomFilterBase {
int hash1;
int hash2;
HashKey<Cell> hashKey;
if (this.bloomType == BloomType.ROW) {
hashKey = new RowBloomHashKey(cell);
if (this.bloomType == BloomType.ROWCOL) {
hashKey = new RowColBloomHashKey(cell);
hash1 = this.hash.hash(hashKey, 0);
hash2 = this.hash.hash(hashKey, hash1);
} else {
hashKey = new RowColBloomHashKey(cell);
hashKey = new RowBloomHashKey(cell);
hash1 = this.hash.hash(hashKey, 0);
hash2 = this.hash.hash(hashKey, hash1);
}

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.util;
import java.text.NumberFormat;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.yetus.audience.InterfaceAudience;
@ -47,6 +49,9 @@ public final class BloomFilterUtil {
*/
private static Random randomGeneratorForTest;
public static final String PREFIX_LENGTH_KEY = "RowPrefixBloomFilter.prefix_length";
public static final String DELIMITER_KEY = "RowPrefixDelimitedBloomFilter.delimiter";
/** Bit-value lookup array to prevent doing the same work over and over */
public static final byte [] bitvals = {
(byte) 0x01,
@ -239,8 +244,8 @@ public final class BloomFilterUtil {
public static boolean contains(Cell cell, ByteBuff bloomBuf, int bloomOffset, int bloomSize,
Hash hash, int hashCount, BloomType type) {
HashKey<Cell> hashKey = type == BloomType.ROW ? new RowBloomHashKey(cell)
: new RowColBloomHashKey(cell);
HashKey<Cell> hashKey = type == BloomType.ROWCOL ? new RowColBloomHashKey(cell)
: new RowBloomHashKey(cell);
return contains(bloomBuf, bloomOffset, bloomSize, hash, hashCount, hashKey);
}
@ -284,4 +289,45 @@ public final class BloomFilterUtil {
return formatStats(bloomFilter) + STATS_RECORD_SEP + "Actual error rate: "
+ String.format("%.8f", bloomFilter.actualErrorRate());
}
public static byte[] getBloomFilterParam(BloomType bloomFilterType, Configuration conf)
throws IllegalArgumentException{
byte[] bloomParam = null;
String message = "Bloom filter type is " + bloomFilterType + ", ";
switch (bloomFilterType) {
case ROWPREFIX_FIXED_LENGTH:
String prefixLengthString = conf.get(PREFIX_LENGTH_KEY);
if (prefixLengthString == null) {
message += PREFIX_LENGTH_KEY + " not specified.";
throw new IllegalArgumentException(message);
}
int prefixLength;
try {
prefixLength = Integer.parseInt(prefixLengthString);
if (prefixLength <= 0 || prefixLength > HConstants.MAX_ROW_LENGTH) {
message += "the value of " + PREFIX_LENGTH_KEY
+ " must >=0 and < " + HConstants.MAX_ROW_LENGTH;
throw new IllegalArgumentException(message);
}
} catch (NumberFormatException nfe) {
message = "Number format exception when parsing " + PREFIX_LENGTH_KEY + " for BloomType "
+ bloomFilterType.toString() + ":"
+ prefixLengthString;
throw new IllegalArgumentException(message, nfe);
}
bloomParam = Bytes.toBytes(prefixLength);
break;
case ROWPREFIX_DELIMITED:
String delimiterString = conf.get(DELIMITER_KEY);
if (delimiterString == null || delimiterString.length() == 0) {
message += DELIMITER_KEY + " not specified.";
throw new IllegalArgumentException(message);
}
bloomParam = Bytes.toBytes(delimiterString);
break;
default:
break;
}
return bloomParam;
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Handles ROWPREFIX_DELIMITED bloom related context.
* It works with both ByteBufferedCell and byte[] backed cells
*/
@InterfaceAudience.Private
public class RowPrefixDelimiterBloomContext extends RowBloomContext {
private final byte[] delimiter;
public RowPrefixDelimiterBloomContext(BloomFilterWriter bloomFilterWriter,
CellComparator comparator, byte[] delimiter) {
super(bloomFilterWriter, comparator);
this.delimiter = delimiter;
}
public void writeBloom(Cell cell) throws IOException {
super.writeBloom(getDelimitedRowPrefixCell(cell));
}
/**
* @param cell the new cell
* @return the new cell created by delimited row prefix
*/
private Cell getDelimitedRowPrefixCell(Cell cell) {
byte[] row = CellUtil.copyRow(cell);
int prefixLength = Bytes.indexOf(row, delimiter);
if (prefixLength <= 0) {
return cell;
}
return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
.setRow(row, 0, Math.min(prefixLength, row.length))
.setType(Cell.Type.Put)
.build();
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Handles ROWPREFIX bloom related context.
* It works with both ByteBufferedCell and byte[] backed cells
*/
@InterfaceAudience.Private
public class RowPrefixFixedLengthBloomContext extends RowBloomContext {
private final int prefixLength;
public RowPrefixFixedLengthBloomContext(BloomFilterWriter bloomFilterWriter,
CellComparator comparator, int prefixLength) {
super(bloomFilterWriter, comparator);
this.prefixLength = prefixLength;
}
public void writeBloom(Cell cell) throws IOException {
super.writeBloom(getRowPrefixCell(cell));
}
/**
* @param cell the cell
* @return the new cell created by row prefix
*/
private Cell getRowPrefixCell(Cell cell) {
byte[] row = CellUtil.copyRow(cell);
return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
.setRow(row, 0, Math.min(prefixLength, row.length))
.setType(Cell.Type.Put)
.build();
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
@ -80,6 +81,8 @@ public class TestSeekBeforeWithInlineBlocks {
@Test
public void testMultiIndexLevelRandomHFileWithBlooms() throws IOException {
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#");
// Try out different HFile versions to ensure reverse scan works on each version
for (int hfileVersion = HFile.MIN_FORMAT_VERSION_WITH_TAGS;
@ -101,6 +104,8 @@ public class TestSeekBeforeWithInlineBlocks {
conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, indexBlockSize);
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE);
conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
conf.set(BloomFilterUtil.DELIMITER_KEY, "#");
Cell[] cells = new Cell[NUM_KV];

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.io.BytesWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -64,6 +65,7 @@ public class CreateRandomStoreFile {
private static final String VALUE_SIZE_OPTION = "v";
private static final String COMPRESSION_OPTION = "c";
private static final String BLOOM_FILTER_OPTION = "bf";
private static final String BLOOM_FILTER_PARAM_OPTION = "bfp";
private static final String BLOCK_SIZE_OPTION = "bs";
private static final String BLOOM_BLOCK_SIZE_OPTION = "bfbs";
private static final String INDEX_BLOCK_SIZE_OPTION = "ibs";
@ -103,6 +105,8 @@ public class CreateRandomStoreFile {
options.addOption(BLOOM_FILTER_OPTION, "bloom_filter", true,
"Bloom filter type, one of "
+ Arrays.toString(BloomType.values()));
options.addOption(BLOOM_FILTER_PARAM_OPTION, "bloom_param", true,
"the parameter of the bloom filter");
options.addOption(BLOCK_SIZE_OPTION, "block_size", true,
"HFile block size");
options.addOption(BLOOM_BLOCK_SIZE_OPTION, "bloom_block_size", true,
@ -169,6 +173,25 @@ public class CreateRandomStoreFile {
BLOOM_FILTER_OPTION));
}
if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
if (!cmdLine.hasOption(BLOOM_FILTER_PARAM_OPTION)) {
LOG.error("the parameter of bloom filter is not specified");
return false;
} else {
conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY,
cmdLine.getOptionValue(BLOOM_FILTER_PARAM_OPTION));
}
}
if (bloomType == BloomType.ROWPREFIX_DELIMITED) {
if (!cmdLine.hasOption(BLOOM_FILTER_PARAM_OPTION)) {
LOG.error("the parameter of bloom filter is not specified");
return false;
} else {
conf.set(BloomFilterUtil.DELIMITER_KEY, cmdLine.getOptionValue(BLOOM_FILTER_PARAM_OPTION));
}
}
int blockSize = HConstants.DEFAULT_BLOCKSIZE;
if (cmdLine.hasOption(BLOCK_SIZE_OPTION))
blockSize = Integer.valueOf(cmdLine.getOptionValue(BLOCK_SIZE_OPTION));

View File

@ -748,7 +748,7 @@ public class TestHStoreFile extends HBaseTestCase {
reader.loadFileInfo();
reader.loadBloomfilter();
StoreFileScanner scanner = getStoreFileScanner(reader, false, false);
assertEquals(expKeys[x], reader.generalBloomFilter.getKeyCount());
assertEquals(expKeys[x], reader.getGeneralBloomFilter().getKeyCount());
HStore store = mock(HStore.class);
when(store.getColumnFamilyDescriptor())

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
@ -150,6 +151,8 @@ public class TestMultiColumnScanner {
@Test
public void testMultiColumnScanner() throws IOException {
TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#");
HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME,
new HColumnDescriptor(FAMILY)
.setCompressionType(comprAlgo)

View File

@ -0,0 +1,399 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test TestRowPrefixBloomFilter
*/
@Category({RegionServerTests.class, SmallTests.class})
public class TestRowPrefixBloomFilter {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRowPrefixBloomFilter.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRowPrefixBloomFilter.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
private static final ChecksumType CKTYPE = ChecksumType.CRC32C;
private static final int CKBYTES = 512;
private boolean localfs = false;
private static Configuration conf;
private static FileSystem fs;
private static Path testDir;
private static final int BLOCKSIZE_SMALL = 8192;
private static final float err = (float) 0.01;
private static final int prefixLength = 10;
private static final String delimiter = "#";
private static final String invalidFormatter = "%08d";
private static final String prefixFormatter = "%010d";
private static final String suffixFormatter = "%010d";
@Rule
public TestName name = new TestName();
@Before
public void setUp() throws Exception {
conf = TEST_UTIL.getConfiguration();
conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err);
conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true);
conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, prefixLength);
conf.set(BloomFilterUtil.DELIMITER_KEY, delimiter);
localfs =
(conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0);
if (fs == null) {
fs = FileSystem.get(conf);
}
try {
if (localfs) {
testDir = TEST_UTIL.getDataTestDir("TestRowPrefixBloomFilter");
if (fs.exists(testDir)) {
fs.delete(testDir, true);
}
} else {
testDir = FSUtils.getRootDir(conf);
}
} catch (Exception e) {
LOG.error(HBaseMarkers.FATAL, "error during setup", e);
throw e;
}
}
@After
public void tearDown() throws Exception {
try {
if (localfs) {
if (fs.exists(testDir)) {
fs.delete(testDir, true);
}
}
} catch (Exception e) {
LOG.error(HBaseMarkers.FATAL, "error during tear down", e);
}
}
private static StoreFileScanner getStoreFileScanner(StoreFileReader reader) {
return reader.getStoreFileScanner(false, false, false, 0, 0, false);
}
private void writeStoreFile(final Path f, BloomType bt, int expKeys, int prefixRowCount,
int suffixRowCount) throws IOException {
HFileContext meta = new HFileContextBuilder()
.withBlockSize(BLOCKSIZE_SMALL)
.withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES)
.build();
// Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withFilePath(f)
.withBloomType(bt)
.withMaxKeyCount(expKeys)
.withFileContext(meta)
.build();
long now = System.currentTimeMillis();
try {
//Put with valid row style
for (int i = 0; i < prefixRowCount; i += 2) { // prefix rows
String prefixRow = String.format(prefixFormatter, i);
for (int j = 0; j < suffixRowCount; j++) { // suffix rows
String row = prefixRow + "#" + String.format(suffixFormatter, j);
KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
Bytes.toBytes("col"), now, Bytes.toBytes("value"));
writer.append(kv);
}
}
//Put with invalid row style
for (int i = prefixRowCount; i < prefixRowCount*2; i += 2) { // prefix rows
String row = String.format(invalidFormatter, i);
KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"),
Bytes.toBytes("col"), now, Bytes.toBytes("value"));
writer.append(kv);
}
} finally {
writer.close();
}
}
@Test
public void testRowPrefixBloomFilter() throws Exception {
FileSystem fs = FileSystem.getLocal(conf);
BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED};
int prefixRowCount = 50;
int suffixRowCount = 10;
int expKeys = 50;
float expErr = 2*prefixRowCount*suffixRowCount*err;
for (int x : new int[]{0,1}) {
// write the file
Path f = new Path(testDir, name.getMethodName());
writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount);
// read the file
StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true,
new AtomicInteger(0), true, conf);
reader.loadFileInfo();
reader.loadBloomfilter();
//check basic param
assertEquals(bt[x], reader.getBloomFilterType());
if (bt[x] == BloomType.ROWPREFIX_FIXED_LENGTH) {
assertEquals(prefixLength, reader.getPrefixLength());
assertEquals("null", Bytes.toStringBinary(reader.getDelimiter()));
} else if (bt[x] == BloomType.ROWPREFIX_DELIMITED){
assertEquals(-1, reader.getPrefixLength());
assertEquals(delimiter, Bytes.toStringBinary(reader.getDelimiter()));
}
assertEquals(expKeys, reader.getGeneralBloomFilter().getKeyCount());
StoreFileScanner scanner = getStoreFileScanner(reader);
HStore store = mock(HStore.class);
when(store.getColumnFamilyDescriptor())
.thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
// check false positives rate
int falsePos = 0;
int falseNeg = 0;
for (int i = 0; i < prefixRowCount; i++) { // prefix rows
String prefixRow = String.format(prefixFormatter, i);
for (int j = 0; j < suffixRowCount; j++) { // suffix rows
String startRow = prefixRow + "#" + String.format(suffixFormatter, j);
String stopRow = prefixRow + "#" + String.format(suffixFormatter, j+1);
Scan scan = new Scan().withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow));
boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
boolean shouldPrefixRowExist = i % 2 == 0;
if (shouldPrefixRowExist) {
if (!exists) {
falseNeg++;
}
} else {
if (exists) {
falsePos++;
}
}
}
}
for (int i = prefixRowCount; i < prefixRowCount * 2; i++) { // prefix rows
String row = String.format(invalidFormatter, i);
Scan scan = new Scan(new Get(Bytes.toBytes(row)));
boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
boolean shouldPrefixRowExist = i % 2 == 0;
if (shouldPrefixRowExist) {
if (!exists) {
falseNeg++;
}
} else {
if (exists) {
falsePos++;
}
}
}
reader.close(true); // evict because we are about to delete the file
fs.delete(f, true);
assertEquals("False negatives: " + falseNeg, 0, falseNeg);
int maxFalsePos = (int) (2 * expErr);
assertTrue("Too many false positives: " + falsePos
+ " (err=" + err + ", expected no more than " + maxFalsePos + ")",
falsePos <= maxFalsePos);
}
}
@Test
public void testRowPrefixBloomFilterWithGet() throws Exception {
FileSystem fs = FileSystem.getLocal(conf);
BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED};
int prefixRowCount = 50;
int suffixRowCount = 10;
int expKeys = 50;
for (int x : new int[]{0,1}) {
// write the file
Path f = new Path(testDir, name.getMethodName());
writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount);
StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true,
new AtomicInteger(0), true, conf);
reader.loadFileInfo();
reader.loadBloomfilter();
StoreFileScanner scanner = getStoreFileScanner(reader);
HStore store = mock(HStore.class);
when(store.getColumnFamilyDescriptor())
.thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
//Get with valid row style
//prefix row in bloom
String prefixRow = String.format(prefixFormatter, prefixRowCount-2);
String row = prefixRow + "#" + String.format(suffixFormatter, 0);
Scan scan = new Scan(new Get(Bytes.toBytes(row)));
boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertTrue(exists);
// prefix row not in bloom
prefixRow = String.format(prefixFormatter, prefixRowCount-1);
row = prefixRow + "#" + String.format(suffixFormatter, 0);
scan = new Scan(new Get(Bytes.toBytes(row)));
exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertFalse(exists);
// Get with invalid row style
// ROWPREFIX: the length of row is less than prefixLength
// ROWPREFIX_DELIMITED: Row does not contain delimiter
// row in bloom
row = String.format(invalidFormatter, prefixRowCount+2);
scan = new Scan(new Get(Bytes.toBytes(row)));
exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertTrue(exists);
// row not in bloom
row = String.format(invalidFormatter, prefixRowCount+1);
scan = new Scan(new Get(Bytes.toBytes(row)));
exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertFalse(exists);
reader.close(true); // evict because we are about to delete the file
fs.delete(f, true);
}
}
@Test
public void testRowPrefixBloomFilterWithScan() throws Exception {
FileSystem fs = FileSystem.getLocal(conf);
BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED};
int prefixRowCount = 50;
int suffixRowCount = 10;
int expKeys = 50;
for (int x : new int[]{0,1}) {
// write the file
Path f = new Path(testDir, name.getMethodName());
writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount);
StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true,
new AtomicInteger(0), true, conf);
reader.loadFileInfo();
reader.loadBloomfilter();
StoreFileScanner scanner = getStoreFileScanner(reader);
HStore store = mock(HStore.class);
when(store.getColumnFamilyDescriptor())
.thenReturn(ColumnFamilyDescriptorBuilder.of("family"));
//Scan with valid row style. startRow and stopRow have a common prefix.
//And the length of the common prefix is no less than prefixLength.
//prefix row in bloom
String prefixRow = String.format(prefixFormatter, prefixRowCount-2);
String startRow = prefixRow + "#" + String.format(suffixFormatter, 0);
String stopRow = prefixRow + "#" + String.format(suffixFormatter, 1);
Scan scan = new Scan().withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow));
boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertTrue(exists);
// prefix row not in bloom
prefixRow = String.format(prefixFormatter, prefixRowCount-1);
startRow = prefixRow + "#" + String.format(suffixFormatter, 0);
stopRow = prefixRow + "#" + String.format(suffixFormatter, 1);
scan = new Scan().withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow));
exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertFalse(exists);
// There is no common prefix between startRow and stopRow.
prefixRow = String.format(prefixFormatter, prefixRowCount-2);
startRow = prefixRow + "#" + String.format(suffixFormatter, 0);
scan = new Scan().withStartRow(Bytes.toBytes(startRow));
exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertTrue(exists);
if (bt[x] == BloomType.ROWPREFIX_FIXED_LENGTH) {
// startRow and stopRow have a common prefix.
// But the length of the common prefix is less than prefixLength.
String prefixStartRow = String.format(prefixFormatter, prefixRowCount-2);
String prefixStopRow = String.format(prefixFormatter, prefixRowCount-1);
startRow = prefixStartRow + "#" + String.format(suffixFormatter, 0);
stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0);
scan = new Scan().withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow));
exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertTrue(exists);
}else if (bt[x] == BloomType.ROWPREFIX_DELIMITED) {
// startRow does not contain delimiter
String prefixStartRow = String.format(prefixFormatter, prefixRowCount-2);
String prefixStopRow = String.format(prefixFormatter, prefixRowCount-2);
startRow = prefixStartRow + String.format(suffixFormatter, 0);
stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0);
scan = new Scan().withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow));
exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertTrue(exists);
// startRow contains delimiter, but stopRow does not have the same prefix as startRow.
prefixStartRow = String.format(prefixFormatter, prefixRowCount-2);
prefixStopRow = String.format(prefixFormatter, prefixRowCount-1);
startRow = prefixStartRow + "#" + String.format(suffixFormatter, 0);
stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0);
scan = new Scan().withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow));
exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE);
assertTrue(exists);
}
reader.close(true); // evict because we are about to delete the file
fs.delete(f, true);
}
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.ClassRule;
@ -103,6 +104,8 @@ public class TestScanWithBloomError {
public void setUp() throws IOException{
conf = TEST_UTIL.getConfiguration();
fs = FileSystem.get(conf);
conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
conf.set(BloomFilterUtil.DELIMITER_KEY, "#");
}
@Test

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
@ -142,6 +143,8 @@ public class TestSeekOptimizations {
public void setUp() {
rand = new Random(91238123L);
expectedKVs.clear();
TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#");
}
@Test