HBASE-9119 hbase.mapreduce.hfileoutputformat.blocksize should configure with blocksize of a table
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1509835 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
34c8e0ab16
commit
a1cb0fc0e8
|
@ -611,18 +611,6 @@ possible configurations would overwhelm and obscure the important.
|
|||
<description>
|
||||
The default thread pool size if parallel-seeking feature enabled.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.mapreduce.hfileoutputformat.blocksize</name>
|
||||
<value>65536</value>
|
||||
<description>The mapreduce HFileOutputFormat writes storefiles/hfiles.
|
||||
This is the minimum hfile blocksize to emit. Usually in hbase, writing
|
||||
hfiles, the blocksize is gotten from the table schema (HColumnDescriptor)
|
||||
but in the mapreduce outputformat context, we don't have access to the
|
||||
schema so get blocksize from Configuration. The smaller you make
|
||||
the blocksize, the bigger your index and the less you fetch on a
|
||||
random-access. Set the blocksize down if you have small cells and want
|
||||
faster random-access of individual cells.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hfile.block.cache.size</name>
|
||||
<value>0.4</value>
|
||||
|
|
|
@ -83,6 +83,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
|
||||
private static final String DATABLOCK_ENCODING_CONF_KEY =
|
||||
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
|
||||
private static final String BLOCK_SIZE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
|
||||
|
||||
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
|
@ -94,8 +95,6 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
// These configs. are from hbase-*.xml
|
||||
final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
|
||||
HConstants.DEFAULT_MAX_FILE_SIZE);
|
||||
final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize",
|
||||
HConstants.DEFAULT_BLOCKSIZE);
|
||||
// Invented config. Add to hbase-*.xml if other than default compression.
|
||||
final String defaultCompression = conf.get("hfile.compression",
|
||||
Compression.Algorithm.NONE.getName());
|
||||
|
@ -105,6 +104,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
// create a map from column family to the compression algorithm
|
||||
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
|
||||
final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
|
||||
final Map<byte[], String> blockSizeMap = createFamilyBlockSizeMap(conf);
|
||||
|
||||
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
|
||||
final HFileDataBlockEncoder encoder;
|
||||
|
@ -201,9 +201,12 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
if (bloomTypeStr != null) {
|
||||
bloomType = BloomType.valueOf(bloomTypeStr);
|
||||
}
|
||||
String blockSizeString = blockSizeMap.get(family);
|
||||
int blockSize = blockSizeString == null ? HConstants.DEFAULT_BLOCKSIZE
|
||||
: Integer.parseInt(blockSizeString);
|
||||
Configuration tempConf = new Configuration(conf);
|
||||
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
|
||||
wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
|
||||
wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blockSize)
|
||||
.withOutputDir(familydir)
|
||||
.withCompression(AbstractHFileWriter.compressionByName(compression))
|
||||
.withBloomType(bloomType)
|
||||
|
@ -353,12 +356,36 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
// Set compression algorithms based on column families
|
||||
configureCompression(table, conf);
|
||||
configureBloomType(table, conf);
|
||||
configureBlockSize(table, conf);
|
||||
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
TableMapReduceUtil.initCredentials(job);
|
||||
LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured.");
|
||||
}
|
||||
|
||||
private static void configureBlockSize(HTable table, Configuration conf) throws IOException {
|
||||
StringBuilder blockSizeConfigValue = new StringBuilder();
|
||||
HTableDescriptor tableDescriptor = table.getTableDescriptor();
|
||||
if(tableDescriptor == null){
|
||||
// could happen with mock table instance
|
||||
return;
|
||||
}
|
||||
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
|
||||
int i = 0;
|
||||
for (HColumnDescriptor familyDescriptor : families) {
|
||||
if (i++ > 0) {
|
||||
blockSizeConfigValue.append('&');
|
||||
}
|
||||
blockSizeConfigValue.append(URLEncoder.encode(
|
||||
familyDescriptor.getNameAsString(), "UTF-8"));
|
||||
blockSizeConfigValue.append('=');
|
||||
blockSizeConfigValue.append(URLEncoder.encode(
|
||||
String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
|
||||
}
|
||||
// Get rid of the last ampersand
|
||||
conf.set(BLOCK_SIZE_CONF_KEY, blockSizeConfigValue.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Run inside the task to deserialize column family to compression algorithm
|
||||
* map from the
|
||||
|
@ -377,6 +404,10 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
|
||||
}
|
||||
|
||||
private static Map<byte[], String> createFamilyBlockSizeMap(Configuration conf) {
|
||||
return createFamilyConfValueMap(conf, BLOCK_SIZE_CONF_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run inside the task to deserialize column family to given conf value map.
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue