From f2f552c2236c40a23b72acf8b9cbe43a0ea3bd44 Mon Sep 17 00:00:00 2001 From: mbertozzi Date: Wed, 10 Apr 2013 10:08:44 +0000 Subject: [PATCH] HBASE-8313 Add Bloom filter testing for HFileOutputFormat git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1466412 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/StoreFile.java | 2 +- .../hadoop/hbase/HBaseTestingUtility.java | 54 +++++++++ .../mapreduce/TestHFileOutputFormat.java | 105 ++++++------------ 3 files changed, 92 insertions(+), 69 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 053d411c960..ba1058712c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -99,7 +99,7 @@ public class StoreFile { Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); /** Bloom filter Type in FileInfo */ - static final byte[] BLOOM_FILTER_TYPE_KEY = + public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); /** Delete Family Count in FileInfo */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 0a5406f82c7..9e34af5ed5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.RegionStates; @@ -84,6 +85,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -2661,4 +2663,56 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { }; } + /** + * Create a set of column descriptors with the combination of compression, + * encoding, bloom codecs available. + * @return the list of column descriptors + */ + public static List generateColumnDescriptors() { + return generateColumnDescriptors(""); + } + + /** + * Create a set of column descriptors with the combination of compression, + * encoding, bloom codecs available. + * @param prefix family names prefix + * @return the list of column descriptors + */ + public static List generateColumnDescriptors(final String prefix) { + List htds = new ArrayList(); + long familyId = 0; + for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) { + for (DataBlockEncoding encodingType: DataBlockEncoding.values()) { + for (BloomType bloomType: BloomType.values()) { + String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); + HColumnDescriptor htd = new HColumnDescriptor(name); + htd.setCompressionType(compressionType); + htd.setDataBlockEncoding(encodingType); + htd.setBloomFilterType(bloomType); + htds.add(htd); + familyId++; + } + } + } + return htds; + } + + /** + * Get supported compression algorithms. + * @return supported compression algorithms. + */ + public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { + String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); + List supportedAlgos = new ArrayList(); + for (String algoName : allAlgos) { + try { + Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); + algo.getCompressor(); + supportedAlgos.add(algo); + } catch (Throwable t) { + // this algo is not available + } + } + return supportedAlgos.toArray(new Compression.Algorithm[0]); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index aa3b6dd2df1..caad539ae10 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -31,6 +31,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; import java.util.Random; import java.util.concurrent.Callable; @@ -65,6 +67,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -542,30 +546,25 @@ public class TestHFileOutputFormat { return familyToCompression; } + /** - * Test that {@link HFileOutputFormat} RecordWriter uses compression settings - * from the column family descriptor + * Test that {@link HFileOutputFormat} RecordWriter uses compression and + * bloom filter settings from the column family descriptor */ @Test - public void testColumnFamilyCompression() throws Exception { + public void testColumnFamilySettings() throws Exception { Configuration conf = new Configuration(this.util.getConfiguration()); RecordWriter writer = null; TaskAttemptContext context = null; - Path dir = - util.getDataTestDirOnTestFS("testColumnFamilyCompression"); + Path dir = util.getDataTestDir("testColumnFamilySettings"); + // Setup table descriptor HTable table = Mockito.mock(HTable.class); - - Map configuredCompression = - new HashMap(); - Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms(); - - int familyIndex = 0; - for (byte[] family : FAMILIES) { - configuredCompression.put(Bytes.toString(family), - supportedAlgos[familyIndex++ % supportedAlgos.length]); + HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); + Mockito.doReturn(htd).when(table).getTableDescriptor(); + for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) { + htd.addFamily(hcd); } - setupMockColumnFamilies(table, configuredCompression); // set up the table to return some mock keys setupMockStartKeys(table); @@ -576,7 +575,7 @@ public class TestHFileOutputFormat { // pollutes the GZip codec pool with an incompatible compressor. conf.set("io.seqfile.compression.type", "NONE"); Job job = new Job(conf, "testLocalMRIncrementalLoad"); - job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilyCompression")); + job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); HFileOutputFormat.configureIncrementalLoad(job, table); FileOutputFormat.setOutputPath(job, dir); @@ -585,75 +584,45 @@ public class TestHFileOutputFormat { writer = hof.getRecordWriter(context); // write out random rows - writeRandomKeyValues(writer, context, ROWSPERSPLIT); + writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT); writer.close(context); // Make sure that a directory was created for every CF - FileSystem fileSystem = dir.getFileSystem(conf); + FileSystem fs = dir.getFileSystem(conf); // commit so that the filesystem has one directory per column family hof.getOutputCommitter(context).commitTask(context); hof.getOutputCommitter(context).commitJob(context); - for (byte[] family : FAMILIES) { - String familyStr = new String(family); - boolean found = false; - for (FileStatus f : fileSystem.listStatus(dir)) { + FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); + assertEquals(htd.getFamilies().size(), families.length); + for (FileStatus f : families) { + String familyStr = f.getPath().getName(); + HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr)); + // verify that the compression on this file matches the configured + // compression + Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); + Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf)); + Map fileInfo = reader.loadFileInfo(); - if (Bytes.toString(family).equals(f.getPath().getName())) { - // we found a matching directory - found = true; - - // verify that the compression on this file matches the configured - // compression - Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath(); - Reader reader = HFile.createReader(fileSystem, dataFilePath, - new CacheConfig(conf)); - reader.loadFileInfo(); - assertEquals("Incorrect compression used for column family " + familyStr - + "(reader: " + reader + ")", - configuredCompression.get(familyStr), reader.getCompressionAlgorithm()); - break; - } - } - - if (!found) { - fail("HFile for column family " + familyStr + " not found"); - } + byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY); + if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); + assertEquals("Incorrect bloom filter used for column family " + familyStr + + "(reader: " + reader + ")", + hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); + assertEquals("Incorrect compression used for column family " + familyStr + + "(reader: " + reader + ")", hcd.getCompression(), reader.getCompressionAlgorithm()); } - } finally { dir.getFileSystem(conf).delete(dir, true); } } - - /** - * @return - */ - private Compression.Algorithm[] getSupportedCompressionAlgorithms() { - String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); - List supportedAlgos = Lists.newArrayList(); - - for (String algoName : allAlgos) { - try { - Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); - algo.getCompressor(); - supportedAlgos.add(algo); - } catch (Throwable t) { - // this algo is not available - } - } - - return supportedAlgos.toArray(new Compression.Algorithm[0]); - } - - /** * Write random values to the writer assuming a table created using * {@link #FAMILIES} as column family descriptors */ - private void writeRandomKeyValues(RecordWriter writer, TaskAttemptContext context, - int numRows) + private void writeRandomKeyValues(RecordWriter writer, + TaskAttemptContext context, Set families, int numRows) throws IOException, InterruptedException { byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; int valLength = 10; @@ -669,7 +638,7 @@ public class TestHFileOutputFormat { random.nextBytes(valBytes); ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); - for (byte[] family : TestHFileOutputFormat.FAMILIES) { + for (byte[] family : families) { KeyValue kv = new KeyValue(keyBytes, family, PerformanceEvaluation.QUALIFIER_NAME, valBytes); writer.write(key, kv);