HBASE-3776 Add Bloom Filter Support to HFileOutputFormat (Anoop Sam John)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1426406 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f9f0761992
commit
14e3bed584
|
@ -49,13 +49,15 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
|
@ -82,7 +84,7 @@ import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
|
|||
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
|
||||
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
|
||||
static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
|
||||
TimeRangeTracker trt = new TimeRangeTracker();
|
||||
private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
|
||||
private static final String DATABLOCK_ENCODING_CONF_KEY =
|
||||
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
|
||||
|
||||
|
@ -106,6 +108,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
|
||||
// create a map from column family to the compression algorithm
|
||||
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
|
||||
final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
|
||||
|
||||
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
|
||||
final HFileDataBlockEncoder encoder;
|
||||
|
@ -166,7 +169,6 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
|
||||
// we now have the proper HLog writer. full steam ahead
|
||||
kv.updateLatestStamp(this.now);
|
||||
trt.includeTimestamp(kv);
|
||||
wl.writer.append(kv);
|
||||
wl.written += length;
|
||||
|
||||
|
@ -187,9 +189,9 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
this.rollRequested = false;
|
||||
}
|
||||
|
||||
/* Create a new HFile.Writer.
|
||||
/* Create a new StoreFile.Writer.
|
||||
* @param family
|
||||
* @return A WriterLength, containing a new HFile.Writer.
|
||||
* @return A WriterLength, containing a new StoreFile.Writer.
|
||||
* @throws IOException
|
||||
*/
|
||||
private WriterLength getNewWriter(byte[] family, Configuration conf)
|
||||
|
@ -198,20 +200,28 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
Path familydir = new Path(outputdir, Bytes.toString(family));
|
||||
String compression = compressionMap.get(family);
|
||||
compression = compression == null ? defaultCompression : compression;
|
||||
wl.writer = HFile.getWriterFactoryNoCache(conf)
|
||||
.withPath(fs, StoreFile.getUniqueFile(fs, familydir))
|
||||
.withBlockSize(blocksize)
|
||||
.withCompression(compression)
|
||||
.withComparator(KeyValue.KEY_COMPARATOR)
|
||||
String bloomTypeStr = bloomTypeMap.get(family);
|
||||
BloomType bloomType = BloomType.NONE;
|
||||
if (bloomTypeStr != null) {
|
||||
bloomType = BloomType.valueOf(bloomTypeStr);
|
||||
}
|
||||
Configuration tempConf = new Configuration(conf);
|
||||
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
|
||||
wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
|
||||
.withOutputDir(familydir)
|
||||
.withCompression(AbstractHFileWriter.compressionByName(compression))
|
||||
.withBloomType(bloomType)
|
||||
.withComparator(KeyValue.COMPARATOR)
|
||||
.withDataBlockEncoder(encoder)
|
||||
.withChecksumType(HStore.getChecksumType(conf))
|
||||
.withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
|
||||
.create();
|
||||
.build();
|
||||
|
||||
this.writers.put(family, wl);
|
||||
return wl;
|
||||
}
|
||||
|
||||
private void close(final HFile.Writer w) throws IOException {
|
||||
private void close(final StoreFile.Writer w) throws IOException {
|
||||
if (w != null) {
|
||||
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
|
||||
Bytes.toBytes(System.currentTimeMillis()));
|
||||
|
@ -221,8 +231,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
Bytes.toBytes(true));
|
||||
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
|
||||
Bytes.toBytes(compactionExclude));
|
||||
w.appendFileInfo(StoreFile.TIMERANGE_KEY,
|
||||
WritableUtils.toByteArray(trt));
|
||||
w.appendTrackedTimestampsToMetadata();
|
||||
w.close();
|
||||
}
|
||||
}
|
||||
|
@ -241,7 +250,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
*/
|
||||
static class WriterLength {
|
||||
long written = 0;
|
||||
HFile.Writer writer = null;
|
||||
StoreFile.Writer writer = null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -359,7 +368,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
|
||||
// Set compression algorithms based on column families
|
||||
configureCompression(table, conf);
|
||||
|
||||
configureBloomType(table, conf);
|
||||
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
LOG.info("Incremental table output configured.");
|
||||
}
|
||||
|
@ -375,25 +385,39 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
* algorithm
|
||||
*/
|
||||
static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
|
||||
Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
|
||||
String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
|
||||
for (String familyConf : compressionConf.split("&")) {
|
||||
return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
|
||||
}
|
||||
|
||||
private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
|
||||
return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run inside the task to deserialize column family to given conf value map.
|
||||
*
|
||||
* @param conf
|
||||
* @param confName
|
||||
* @return a map of column family to the given configuration value
|
||||
*/
|
||||
private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
|
||||
Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
|
||||
String confVal = conf.get(confName, "");
|
||||
for (String familyConf : confVal.split("&")) {
|
||||
String[] familySplit = familyConf.split("=");
|
||||
if (familySplit.length != 2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
|
||||
confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
|
||||
URLDecoder.decode(familySplit[1], "UTF-8"));
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
// will not happen with UTF-8 encoding
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
return compressionMap;
|
||||
return confValMap;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Serialize column family to compression algorithm map to configuration.
|
||||
* Invoked while configuring the MR job for incremental load.
|
||||
|
@ -423,4 +447,35 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
// Get rid of the last ampersand
|
||||
conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize column family to bloom type map to configuration.
|
||||
* Invoked while configuring the MR job for incremental load.
|
||||
*
|
||||
* @throws IOException
|
||||
* on failure to read column family descriptors
|
||||
*/
|
||||
static void configureBloomType(HTable table, Configuration conf) throws IOException {
|
||||
HTableDescriptor tableDescriptor = table.getTableDescriptor();
|
||||
if (tableDescriptor == null) {
|
||||
// could happen with mock table instance
|
||||
return;
|
||||
}
|
||||
StringBuilder bloomTypeConfigValue = new StringBuilder();
|
||||
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
|
||||
int i = 0;
|
||||
for (HColumnDescriptor familyDescriptor : families) {
|
||||
if (i++ > 0) {
|
||||
bloomTypeConfigValue.append('&');
|
||||
}
|
||||
bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
|
||||
bloomTypeConfigValue.append('=');
|
||||
String bloomType = familyDescriptor.getBloomFilterType().toString();
|
||||
if (bloomType == null) {
|
||||
bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
|
||||
}
|
||||
bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
|
||||
}
|
||||
conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue