HBASE-8313 Add Bloom filter testing for HFileOutputFormat
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1466412 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
da58fb4a99
commit
f2f552c223
|
@ -99,7 +99,7 @@ public class StoreFile {
|
|||
Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
|
||||
|
||||
/** Bloom filter Type in FileInfo */
|
||||
static final byte[] BLOOM_FILTER_TYPE_KEY =
|
||||
public static final byte[] BLOOM_FILTER_TYPE_KEY =
|
||||
Bytes.toBytes("BLOOM_FILTER_TYPE");
|
||||
|
||||
/** Delete Family Count in FileInfo */
|
||||
|
|
|
@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
|
|||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
|
@ -84,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -2661,4 +2663,56 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a set of column descriptors with the combination of compression,
|
||||
* encoding, bloom codecs available.
|
||||
* @return the list of column descriptors
|
||||
*/
|
||||
public static List<HColumnDescriptor> generateColumnDescriptors() {
|
||||
return generateColumnDescriptors("");
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a set of column descriptors with the combination of compression,
|
||||
* encoding, bloom codecs available.
|
||||
* @param prefix family names prefix
|
||||
* @return the list of column descriptors
|
||||
*/
|
||||
public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
|
||||
List<HColumnDescriptor> htds = new ArrayList<HColumnDescriptor>();
|
||||
long familyId = 0;
|
||||
for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
|
||||
for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
|
||||
for (BloomType bloomType: BloomType.values()) {
|
||||
String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
|
||||
HColumnDescriptor htd = new HColumnDescriptor(name);
|
||||
htd.setCompressionType(compressionType);
|
||||
htd.setDataBlockEncoding(encodingType);
|
||||
htd.setBloomFilterType(bloomType);
|
||||
htds.add(htd);
|
||||
familyId++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return htds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get supported compression algorithms.
|
||||
* @return supported compression algorithms.
|
||||
*/
|
||||
public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
|
||||
String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
|
||||
List<Compression.Algorithm> supportedAlgos = new ArrayList<Compression.Algorithm>();
|
||||
for (String algoName : allAlgos) {
|
||||
try {
|
||||
Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
|
||||
algo.getCompressor();
|
||||
supportedAlgos.add(algo);
|
||||
} catch (Throwable t) {
|
||||
// this algo is not available
|
||||
}
|
||||
}
|
||||
return supportedAlgos.toArray(new Compression.Algorithm[0]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,8 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
|
@ -65,6 +67,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
@ -542,30 +546,25 @@ public class TestHFileOutputFormat {
|
|||
return familyToCompression;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that {@link HFileOutputFormat} RecordWriter uses compression settings
|
||||
* from the column family descriptor
|
||||
* Test that {@link HFileOutputFormat} RecordWriter uses compression and
|
||||
* bloom filter settings from the column family descriptor
|
||||
*/
|
||||
@Test
|
||||
public void testColumnFamilyCompression() throws Exception {
|
||||
public void testColumnFamilySettings() throws Exception {
|
||||
Configuration conf = new Configuration(this.util.getConfiguration());
|
||||
RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
|
||||
TaskAttemptContext context = null;
|
||||
Path dir =
|
||||
util.getDataTestDirOnTestFS("testColumnFamilyCompression");
|
||||
Path dir = util.getDataTestDir("testColumnFamilySettings");
|
||||
|
||||
// Setup table descriptor
|
||||
HTable table = Mockito.mock(HTable.class);
|
||||
|
||||
Map<String, Compression.Algorithm> configuredCompression =
|
||||
new HashMap<String, Compression.Algorithm>();
|
||||
Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms();
|
||||
|
||||
int familyIndex = 0;
|
||||
for (byte[] family : FAMILIES) {
|
||||
configuredCompression.put(Bytes.toString(family),
|
||||
supportedAlgos[familyIndex++ % supportedAlgos.length]);
|
||||
HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
|
||||
Mockito.doReturn(htd).when(table).getTableDescriptor();
|
||||
for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) {
|
||||
htd.addFamily(hcd);
|
||||
}
|
||||
setupMockColumnFamilies(table, configuredCompression);
|
||||
|
||||
// set up the table to return some mock keys
|
||||
setupMockStartKeys(table);
|
||||
|
@ -576,7 +575,7 @@ public class TestHFileOutputFormat {
|
|||
// pollutes the GZip codec pool with an incompatible compressor.
|
||||
conf.set("io.seqfile.compression.type", "NONE");
|
||||
Job job = new Job(conf, "testLocalMRIncrementalLoad");
|
||||
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilyCompression"));
|
||||
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
|
||||
setupRandomGeneratorMapper(job);
|
||||
HFileOutputFormat.configureIncrementalLoad(job, table);
|
||||
FileOutputFormat.setOutputPath(job, dir);
|
||||
|
@ -585,75 +584,45 @@ public class TestHFileOutputFormat {
|
|||
writer = hof.getRecordWriter(context);
|
||||
|
||||
// write out random rows
|
||||
writeRandomKeyValues(writer, context, ROWSPERSPLIT);
|
||||
writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
|
||||
writer.close(context);
|
||||
|
||||
// Make sure that a directory was created for every CF
|
||||
FileSystem fileSystem = dir.getFileSystem(conf);
|
||||
FileSystem fs = dir.getFileSystem(conf);
|
||||
|
||||
// commit so that the filesystem has one directory per column family
|
||||
hof.getOutputCommitter(context).commitTask(context);
|
||||
hof.getOutputCommitter(context).commitJob(context);
|
||||
for (byte[] family : FAMILIES) {
|
||||
String familyStr = new String(family);
|
||||
boolean found = false;
|
||||
for (FileStatus f : fileSystem.listStatus(dir)) {
|
||||
FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
|
||||
assertEquals(htd.getFamilies().size(), families.length);
|
||||
for (FileStatus f : families) {
|
||||
String familyStr = f.getPath().getName();
|
||||
HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
|
||||
// verify that the compression on this file matches the configured
|
||||
// compression
|
||||
Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
|
||||
Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf));
|
||||
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
|
||||
|
||||
if (Bytes.toString(family).equals(f.getPath().getName())) {
|
||||
// we found a matching directory
|
||||
found = true;
|
||||
|
||||
// verify that the compression on this file matches the configured
|
||||
// compression
|
||||
Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath();
|
||||
Reader reader = HFile.createReader(fileSystem, dataFilePath,
|
||||
new CacheConfig(conf));
|
||||
reader.loadFileInfo();
|
||||
assertEquals("Incorrect compression used for column family " + familyStr
|
||||
+ "(reader: " + reader + ")",
|
||||
configuredCompression.get(familyStr), reader.getCompressionAlgorithm());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
fail("HFile for column family " + familyStr + " not found");
|
||||
}
|
||||
byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
|
||||
if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
|
||||
assertEquals("Incorrect bloom filter used for column family " + familyStr +
|
||||
"(reader: " + reader + ")",
|
||||
hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
|
||||
assertEquals("Incorrect compression used for column family " + familyStr +
|
||||
"(reader: " + reader + ")", hcd.getCompression(), reader.getCompressionAlgorithm());
|
||||
}
|
||||
|
||||
} finally {
|
||||
dir.getFileSystem(conf).delete(dir, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
private Compression.Algorithm[] getSupportedCompressionAlgorithms() {
|
||||
String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
|
||||
List<Compression.Algorithm> supportedAlgos = Lists.newArrayList();
|
||||
|
||||
for (String algoName : allAlgos) {
|
||||
try {
|
||||
Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
|
||||
algo.getCompressor();
|
||||
supportedAlgos.add(algo);
|
||||
} catch (Throwable t) {
|
||||
// this algo is not available
|
||||
}
|
||||
}
|
||||
|
||||
return supportedAlgos.toArray(new Compression.Algorithm[0]);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Write random values to the writer assuming a table created using
|
||||
* {@link #FAMILIES} as column family descriptors
|
||||
*/
|
||||
private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer, TaskAttemptContext context,
|
||||
int numRows)
|
||||
private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer,
|
||||
TaskAttemptContext context, Set<byte[]> families, int numRows)
|
||||
throws IOException, InterruptedException {
|
||||
byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
|
||||
int valLength = 10;
|
||||
|
@ -669,7 +638,7 @@ public class TestHFileOutputFormat {
|
|||
random.nextBytes(valBytes);
|
||||
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
|
||||
|
||||
for (byte[] family : TestHFileOutputFormat.FAMILIES) {
|
||||
for (byte[] family : families) {
|
||||
KeyValue kv = new KeyValue(keyBytes, family,
|
||||
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
|
||||
writer.write(key, kv);
|
||||
|
|
Loading…
Reference in New Issue