HBASE-10323 Auto detect data block encoding in HFileOutputFormat

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1559771 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2014-01-20 16:33:42 +00:00
parent 8eeb7f3d9a
commit e93012ca83
2 changed files with 453 additions and 89 deletions

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URLDecoder; import java.net.URLDecoder;
@ -29,7 +30,6 @@ import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -67,7 +68,7 @@ import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
/** /**
* Writes HFiles. Passed KeyValues must arrive in order. * Writes HFiles. Passed KeyValues must arrive in order.
* Writes current time as the sequence id for the file. Sets the major compacted * Writes current time as the sequence id for the file. Sets the major compacted
* attribute on created hfiles. Calling write(null,null) will forceably roll * attribute on created hfiles. Calling write(null,null) will forcibly roll
* all HFiles being written. * all HFiles being written.
* <p> * <p>
* Using this class as part of a MapReduce job is best done * Using this class as part of a MapReduce job is best done
@ -78,11 +79,26 @@ import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
@InterfaceStability.Stable @InterfaceStability.Stable
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> { public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
static Log LOG = LogFactory.getLog(HFileOutputFormat.class); static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; // The following constants are private since these are used by
private static final String DATABLOCK_ENCODING_CONF_KEY = // HFileOutputFormat to internally transfer data between job setup and
"hbase.mapreduce.hfileoutputformat.datablock.encoding"; // reducer run using conf.
private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; // These should not be changed by the client.
private static final String COMPRESSION_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.compression";
private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.bloomtype";
private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.blocksize";
private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
// This constant is public since the client can modify this when setting
// up their conf object and thus refer to this symbol.
// It is present for backwards compatibility reasons. Use it only to
// override the auto-detection of datablock encoding.
public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context) public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -95,17 +111,27 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE); HConstants.DEFAULT_MAX_FILE_SIZE);
// Invented config. Add to hbase-*.xml if other than default compression. // Invented config. Add to hbase-*.xml if other than default compression.
final String defaultCompression = conf.get("hfile.compression", final String defaultCompressionStr = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName()); Compression.Algorithm.NONE.getName());
final Algorithm defaultCompression = AbstractHFileWriter
.compressionByName(defaultCompressionStr);
final boolean compactionExclude = conf.getBoolean( final boolean compactionExclude = conf.getBoolean(
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false); "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
// create a map from column family to the compression algorithm // create a map from column family to the compression algorithm
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf); final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf); final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
final Map<byte[], String> blockSizeMap = createFamilyBlockSizeMap(conf); final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
final String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
final Map<byte[], DataBlockEncoding> datablockEncodingMap
= createFamilyDataBlockEncodingMap(conf);
final DataBlockEncoding overriddenEncoding;
if (dataBlockEncodingStr != null) {
overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
} else {
overriddenEncoding = null;
}
return new RecordWriter<ImmutableBytesWritable, KeyValue>() { return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
// Map of families to writers and how much has been output on the writer. // Map of families to writers and how much has been output on the writer.
@ -180,26 +206,23 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
throws IOException { throws IOException {
WriterLength wl = new WriterLength(); WriterLength wl = new WriterLength();
Path familydir = new Path(outputdir, Bytes.toString(family)); Path familydir = new Path(outputdir, Bytes.toString(family));
String compression = compressionMap.get(family); Algorithm compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression; compression = compression == null ? defaultCompression : compression;
String bloomTypeStr = bloomTypeMap.get(family); BloomType bloomType = bloomTypeMap.get(family);
BloomType bloomType = BloomType.NONE; bloomType = bloomType == null ? BloomType.NONE : bloomType;
if (bloomTypeStr != null) { Integer blockSize = blockSizeMap.get(family);
bloomType = BloomType.valueOf(bloomTypeStr); blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
} DataBlockEncoding encoding = overriddenEncoding;
String blockSizeString = blockSizeMap.get(family); encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
: Integer.parseInt(blockSizeString);
Configuration tempConf = new Configuration(conf); Configuration tempConf = new Configuration(conf);
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
HFileContextBuilder contextBuilder = new HFileContextBuilder() HFileContextBuilder contextBuilder = new HFileContextBuilder()
.withCompression(AbstractHFileWriter.compressionByName(compression)) .withCompression(compression)
.withChecksumType(HStore.getChecksumType(conf)) .withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(blockSize); .withBlockSize(blockSize);
if(dataBlockEncodingStr != null) { contextBuilder.withDataBlockEncoding(encoding);
contextBuilder.withDataBlockEncoding(DataBlockEncoding.valueOf(dataBlockEncodingStr));
}
HFileContext hFileContext = contextBuilder.build(); HFileContext hFileContext = contextBuilder.build();
wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
@ -349,62 +372,102 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
configureCompression(table, conf); configureCompression(table, conf);
configureBloomType(table, conf); configureBloomType(table, conf);
configureBlockSize(table, conf); configureBlockSize(table, conf);
configureDataBlockEncoding(table, conf);
TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job); TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured.");
} }
private static void configureBlockSize(HTable table, Configuration conf) throws IOException { /**
StringBuilder blockSizeConfigValue = new StringBuilder(); * Runs inside the task to deserialize column family to compression algorithm
HTableDescriptor tableDescriptor = table.getTableDescriptor(); * map from the configuration.
if(tableDescriptor == null){ *
// could happen with mock table instance * @param conf to read the serialized values from
return; * @return a map from column family to the configured compression algorithm
*/
@VisibleForTesting
static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
conf) {
Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
COMPRESSION_FAMILIES_CONF_KEY);
Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
Algorithm>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
Algorithm algorithm = AbstractHFileWriter.compressionByName
(e.getValue());
compressionMap.put(e.getKey(), algorithm);
} }
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); return compressionMap;
int i = 0;
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
blockSizeConfigValue.append('&');
}
blockSizeConfigValue.append(URLEncoder.encode(
familyDescriptor.getNameAsString(), "UTF-8"));
blockSizeConfigValue.append('=');
blockSizeConfigValue.append(URLEncoder.encode(
String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
}
// Get rid of the last ampersand
conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString());
} }
/** /**
* Run inside the task to deserialize column family to compression algorithm * Runs inside the task to deserialize column family to bloom filter type
* map from the * map from the configuration.
* configuration.
* *
* Package-private for unit tests only. * @param conf to read the serialized values from
* * @return a map from column family to the the configured bloom filter type
* @return a map from column family to the name of the configured compression
* algorithm
*/ */
static Map<byte[], String> createFamilyCompressionMap(Configuration conf) { @VisibleForTesting
return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY); static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
BLOOM_TYPE_FAMILIES_CONF_KEY);
Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
BloomType>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
BloomType bloomType = BloomType.valueOf(e.getValue());
bloomTypeMap.put(e.getKey(), bloomType);
}
return bloomTypeMap;
} }
private static Map<byte[], String> createFamilyBloomMap(Configuration conf) { /**
return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY); * Runs inside the task to deserialize column family to block size
* map from the configuration.
*
* @param conf to read the serialized values from
* @return a map from column family to the configured block size
*/
@VisibleForTesting
static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
BLOCK_SIZE_FAMILIES_CONF_KEY);
Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
Integer>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
Integer blockSize = Integer.parseInt(e.getValue());
blockSizeMap.put(e.getKey(), blockSize);
}
return blockSizeMap;
} }
private static Map<byte[], String> createFamilyBlockSizeMap(Configuration conf) { /**
return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY); * Runs inside the task to deserialize column family to data block encoding
* type map from the configuration.
*
* @param conf to read the serialized values from
* @return a map from column family to HFileDataBlockEncoder for the
* configured data block type for the family
*/
@VisibleForTesting
static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
Configuration conf) {
Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
}
return encoderMap;
} }
/** /**
* Run inside the task to deserialize column family to given conf value map. * Run inside the task to deserialize column family to given conf value map.
* *
* @param conf * @param conf to read the serialized values from
* @param confName * @param confName conf key to read from the configuration
* @return a map of column family to the given configuration value * @return a map of column family to the given configuration value
*/ */
private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
@ -449,13 +512,14 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
* Serialize column family to compression algorithm map to configuration. * Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load. * Invoked while configuring the MR job for incremental load.
* *
* Package-private for unit tests only. * @param table to read the properties from
* * @param conf to persist serialized values into
* @throws IOException * @throws IOException
* on failure to read column family descriptors * on failure to read column family descriptors
*/ */
@edu.umd.cs.findbugs.annotations.SuppressWarnings( @edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
@VisibleForTesting
static void configureCompression(HTable table, Configuration conf) throws IOException { static void configureCompression(HTable table, Configuration conf) throws IOException {
StringBuilder compressionConfigValue = new StringBuilder(); StringBuilder compressionConfigValue = new StringBuilder();
HTableDescriptor tableDescriptor = table.getTableDescriptor(); HTableDescriptor tableDescriptor = table.getTableDescriptor();
@ -474,16 +538,52 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8")); compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
} }
// Get rid of the last ampersand // Get rid of the last ampersand
conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString()); conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
}
/**
* Serialize column family to block size map to configuration.
* Invoked while configuring the MR job for incremental load.
*
* @param table to read the properties from
* @param conf to persist serialized values into
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
static void configureBlockSize(HTable table, Configuration conf) throws IOException {
StringBuilder blockSizeConfigValue = new StringBuilder();
HTableDescriptor tableDescriptor = table.getTableDescriptor();
if (tableDescriptor == null) {
// could happen with mock table instance
return;
}
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
int i = 0;
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
blockSizeConfigValue.append('&');
}
blockSizeConfigValue.append(URLEncoder.encode(
familyDescriptor.getNameAsString(), "UTF-8"));
blockSizeConfigValue.append('=');
blockSizeConfigValue.append(URLEncoder.encode(
String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
}
// Get rid of the last ampersand
conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
} }
/** /**
* Serialize column family to bloom type map to configuration. * Serialize column family to bloom type map to configuration.
* Invoked while configuring the MR job for incremental load. * Invoked while configuring the MR job for incremental load.
* *
* @param table to read the properties from
* @param conf to persist serialized values into
* @throws IOException * @throws IOException
* on failure to read column family descriptors * on failure to read column family descriptors
*/ */
@VisibleForTesting
static void configureBloomType(HTable table, Configuration conf) throws IOException { static void configureBloomType(HTable table, Configuration conf) throws IOException {
HTableDescriptor tableDescriptor = table.getTableDescriptor(); HTableDescriptor tableDescriptor = table.getTableDescriptor();
if (tableDescriptor == null) { if (tableDescriptor == null) {
@ -505,6 +605,44 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
} }
bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
} }
conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString()); conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
}
/**
* Serialize column family to data block encoding map to configuration.
* Invoked while configuring the MR job for incremental load.
*
* @param table to read the properties from
* @param conf to persist serialized values into
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
static void configureDataBlockEncoding(HTable table,
Configuration conf) throws IOException {
HTableDescriptor tableDescriptor = table.getTableDescriptor();
if (tableDescriptor == null) {
// could happen with mock table instance
return;
}
StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
int i = 0;
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
dataBlockEncodingConfigValue.append('&');
}
dataBlockEncodingConfigValue.append(
URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
dataBlockEncodingConfigValue.append('=');
DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
if (encoding == null) {
encoding = DataBlockEncoding.NONE;
}
dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
"UTF-8"));
}
conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
dataBlockEncodingConfigValue.toString());
} }
} }

View File

@ -33,9 +33,7 @@ import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -64,6 +62,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
@ -474,36 +473,40 @@ public class TestHFileOutputFormat {
} }
/** /**
* Test for * Test for {@link HFileOutputFormat#configureCompression(HTable,
* {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests * Configuration)} and {@link HFileOutputFormat#createFamilyCompressionMap
* that the compression map is correctly deserialized from configuration * (Configuration)}.
* Tests that the compression map is correctly serialized into
* and deserialized from configuration
* *
* @throws IOException * @throws IOException
*/ */
@Test @Test
public void testCreateFamilyCompressionMap() throws IOException { public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
for (int numCfs = 0; numCfs <= 3; numCfs++) { for (int numCfs = 0; numCfs <= 3; numCfs++) {
Configuration conf = new Configuration(this.util.getConfiguration()); Configuration conf = new Configuration(this.util.getConfiguration());
Map<String, Compression.Algorithm> familyToCompression = getMockColumnFamilies(numCfs); Map<String, Compression.Algorithm> familyToCompression =
getMockColumnFamiliesForCompression(numCfs);
HTable table = Mockito.mock(HTable.class); HTable table = Mockito.mock(HTable.class);
setupMockColumnFamilies(table, familyToCompression); setupMockColumnFamiliesForCompression(table, familyToCompression);
HFileOutputFormat.configureCompression(table, conf); HFileOutputFormat.configureCompression(table, conf);
// read back family specific compression setting from the configuration // read back family specific compression setting from the configuration
Map<byte[], String> retrievedFamilyToCompressionMap = HFileOutputFormat.createFamilyCompressionMap(conf); Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat
.createFamilyCompressionMap(conf);
// test that we have a value for all column families that matches with the // test that we have a value for all column families that matches with the
// used mock values // used mock values
for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue() assertEquals("Compression configuration incorrect for column family:"
.getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes())); + entry.getKey(), entry.getValue(),
retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
} }
} }
} }
private void setupMockColumnFamilies(HTable table, private void setupMockColumnFamiliesForCompression(HTable table,
Map<String, Compression.Algorithm> familyToCompression) throws IOException Map<String, Compression.Algorithm> familyToCompression) throws IOException {
{
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME); HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()) mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
@ -515,21 +518,12 @@ public class TestHFileOutputFormat {
Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor(); Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
} }
private void setupMockStartKeys(HTable table) throws IOException {
byte[][] mockKeys = new byte[][] {
HConstants.EMPTY_BYTE_ARRAY,
Bytes.toBytes("aaa"),
Bytes.toBytes("ggg"),
Bytes.toBytes("zzz")
};
Mockito.doReturn(mockKeys).when(table).getStartKeys();
}
/** /**
* @return a map from column family names to compression algorithms for * @return a map from column family names to compression algorithms for
* testing column family compression. Column family names have special characters * testing column family compression. Column family names have special characters
*/ */
private Map<String, Compression.Algorithm> getMockColumnFamilies(int numCfs) { private Map<String, Compression.Algorithm>
getMockColumnFamiliesForCompression (int numCfs) {
Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>(); Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
// use column family names having special characters // use column family names having special characters
if (numCfs-- > 0) { if (numCfs-- > 0) {
@ -548,6 +542,238 @@ public class TestHFileOutputFormat {
} }
/**
* Test for {@link HFileOutputFormat#configureBloomType(HTable,
* Configuration)} and {@link HFileOutputFormat#createFamilyBloomTypeMap
* (Configuration)}.
* Tests that the compression map is correctly serialized into
* and deserialized from configuration
*
* @throws IOException
*/
@Test
public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
for (int numCfs = 0; numCfs <= 2; numCfs++) {
Configuration conf = new Configuration(this.util.getConfiguration());
Map<String, BloomType> familyToBloomType =
getMockColumnFamiliesForBloomType(numCfs);
HTable table = Mockito.mock(HTable.class);
setupMockColumnFamiliesForBloomType(table,
familyToBloomType);
HFileOutputFormat.configureBloomType(table, conf);
// read back family specific data block encoding settings from the
// configuration
Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
HFileOutputFormat
.createFamilyBloomTypeMap(conf);
// test that we have a value for all column families that matches with the
// used mock values
for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
assertEquals("BloomType configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
}
}
}
private void setupMockColumnFamiliesForBloomType(HTable table,
Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
.setBloomFilterType(entry.getValue())
.setBlockCacheEnabled(false)
.setTimeToLive(0));
}
Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
}
/**
* @return a map from column family names to compression algorithms for
* testing column family compression. Column family names have special characters
*/
private Map<String, BloomType>
getMockColumnFamiliesForBloomType (int numCfs) {
Map<String, BloomType> familyToBloomType =
new HashMap<String, BloomType>();
// use column family names having special characters
if (numCfs-- > 0) {
familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
}
if (numCfs-- > 0) {
familyToBloomType.put("Family2=asdads&!AASD",
BloomType.ROWCOL);
}
if (numCfs-- > 0) {
familyToBloomType.put("Family3", BloomType.NONE);
}
return familyToBloomType;
}
/**
* Test for {@link HFileOutputFormat#configureBlockSize(HTable,
* Configuration)} and {@link HFileOutputFormat#createFamilyBlockSizeMap
* (Configuration)}.
* Tests that the compression map is correctly serialized into
* and deserialized from configuration
*
* @throws IOException
*/
@Test
public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
for (int numCfs = 0; numCfs <= 3; numCfs++) {
Configuration conf = new Configuration(this.util.getConfiguration());
Map<String, Integer> familyToBlockSize =
getMockColumnFamiliesForBlockSize(numCfs);
HTable table = Mockito.mock(HTable.class);
setupMockColumnFamiliesForBlockSize(table,
familyToBlockSize);
HFileOutputFormat.configureBlockSize(table, conf);
// read back family specific data block encoding settings from the
// configuration
Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
HFileOutputFormat
.createFamilyBlockSizeMap(conf);
// test that we have a value for all column families that matches with the
// used mock values
for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
) {
assertEquals("BlockSize configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
}
}
}
private void setupMockColumnFamiliesForBlockSize(HTable table,
Map<String, Integer> familyToDataBlockEncoding) throws IOException {
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
.setBlocksize(entry.getValue())
.setBlockCacheEnabled(false)
.setTimeToLive(0));
}
Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
}
/**
* @return a map from column family names to compression algorithms for
* testing column family compression. Column family names have special characters
*/
private Map<String, Integer>
getMockColumnFamiliesForBlockSize (int numCfs) {
Map<String, Integer> familyToBlockSize =
new HashMap<String, Integer>();
// use column family names having special characters
if (numCfs-- > 0) {
familyToBlockSize.put("Family1!@#!@#&", 1234);
}
if (numCfs-- > 0) {
familyToBlockSize.put("Family2=asdads&!AASD",
Integer.MAX_VALUE);
}
if (numCfs-- > 0) {
familyToBlockSize.put("Family2=asdads&!AASD",
Integer.MAX_VALUE);
}
if (numCfs-- > 0) {
familyToBlockSize.put("Family3", 0);
}
return familyToBlockSize;
}
/**
* Test for {@link HFileOutputFormat#configureDataBlockEncoding(HTable,
* Configuration)} and {@link HFileOutputFormat#createFamilyDataBlockEncodingMap
* (Configuration)}.
* Tests that the compression map is correctly serialized into
* and deserialized from configuration
*
* @throws IOException
*/
@Test
public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
for (int numCfs = 0; numCfs <= 3; numCfs++) {
Configuration conf = new Configuration(this.util.getConfiguration());
Map<String, DataBlockEncoding> familyToDataBlockEncoding =
getMockColumnFamiliesForDataBlockEncoding(numCfs);
HTable table = Mockito.mock(HTable.class);
setupMockColumnFamiliesForDataBlockEncoding(table,
familyToDataBlockEncoding);
HFileOutputFormat.configureDataBlockEncoding(table, conf);
// read back family specific data block encoding settings from the
// configuration
Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
HFileOutputFormat
.createFamilyDataBlockEncodingMap(conf);
// test that we have a value for all column families that matches with the
// used mock values
for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
assertEquals("DataBlockEncoding configuration incorrect for column family:"
+ entry.getKey(), entry.getValue(),
retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
}
}
}
private void setupMockColumnFamiliesForDataBlockEncoding(HTable table,
Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
.setMaxVersions(1)
.setDataBlockEncoding(entry.getValue())
.setBlockCacheEnabled(false)
.setTimeToLive(0));
}
Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
}
/**
* @return a map from column family names to compression algorithms for
* testing column family compression. Column family names have special characters
*/
private Map<String, DataBlockEncoding>
getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
Map<String, DataBlockEncoding> familyToDataBlockEncoding =
new HashMap<String, DataBlockEncoding>();
// use column family names having special characters
if (numCfs-- > 0) {
familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
}
if (numCfs-- > 0) {
familyToDataBlockEncoding.put("Family2=asdads&!AASD",
DataBlockEncoding.FAST_DIFF);
}
if (numCfs-- > 0) {
familyToDataBlockEncoding.put("Family2=asdads&!AASD",
DataBlockEncoding.PREFIX);
}
if (numCfs-- > 0) {
familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
}
return familyToDataBlockEncoding;
}
private void setupMockStartKeys(HTable table) throws IOException {
byte[][] mockKeys = new byte[][] {
HConstants.EMPTY_BYTE_ARRAY,
Bytes.toBytes("aaa"),
Bytes.toBytes("ggg"),
Bytes.toBytes("zzz")
};
Mockito.doReturn(mockKeys).when(table).getStartKeys();
}
/** /**
* Test that {@link HFileOutputFormat} RecordWriter uses compression and * Test that {@link HFileOutputFormat} RecordWriter uses compression and
* bloom filter settings from the column family descriptor * bloom filter settings from the column family descriptor