HBASE-21810 bulkload support set hfile compression on client

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
chenyechao 2019-03-11 19:28:08 +08:00 committed by Guanghao Zhang
parent 648fb72702
commit a54f0bfb8f
2 changed files with 56 additions and 2 deletions

View File

@ -160,9 +160,11 @@ public class HFileOutputFormat2
// This constant is public since the client can modify this when setting
// up their conf object and thus refer to this symbol.
// It is present for backwards compatibility reasons. Use it only to
// override the auto-detection of datablock encoding.
// override the auto-detection of datablock encoding and compression.
public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
public static final String COMPRESSION_OVERRIDE_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.compression";
/**
* Keep locality while generating HFiles for bulkload. See HBASE-12596
@ -210,6 +212,14 @@ public class HFileOutputFormat2
Compression.Algorithm.NONE.getName());
final Algorithm defaultCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr);
String compressionStr = conf.get(COMPRESSION_OVERRIDE_CONF_KEY);
final Algorithm overriddenCompression;
if (compressionStr != null) {
overriddenCompression = Compression.getCompressionAlgorithmByName(compressionStr);
} else {
overriddenCompression = null;
}
final boolean compactionExclude = conf.getBoolean(
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
@ -399,7 +409,8 @@ public class HFileOutputFormat2
new Path(getTableRelativePath(tableName), Bytes.toString(family)));
}
WriterLength wl = new WriterLength();
Algorithm compression = compressionMap.get(tableAndFamily);
Algorithm compression = overriddenCompression;
compression = compression == null ? compressionMap.get(tableAndFamily) : compression;
compression = compression == null ? defaultCompression : compression;
BloomType bloomType = bloomTypeMap.get(tableAndFamily);
bloomType = bloomType == null ? BloomType.NONE : bloomType;

View File

@ -1551,5 +1551,48 @@ public class TestHFileOutputFormat2 {
Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
Assert.assertTrue(fs.exists(new Path(partitionPathString)));
}
@Test
public void TestConfigureCompression() throws Exception {
Configuration conf = new Configuration(this.util.getConfiguration());
RecordWriter<ImmutableBytesWritable, Cell> writer = null;
TaskAttemptContext context = null;
Path dir = util.getDataTestDir("TestConfigureCompression");
String hfileoutputformatCompression = "gz";
try {
conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
Job job = Job.getInstance(conf);
FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job);
HFileOutputFormat2 hof = new HFileOutputFormat2();
writer = hof.getRecordWriter(context);
final byte[] b = Bytes.toBytes("b");
KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
writer.write(new ImmutableBytesWritable(), kv);
writer.close(context);
writer = null;
FileSystem fs = dir.getFileSystem(conf);
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
while (iterator.hasNext()) {
LocatedFileStatus keyFileStatus = iterator.next();
HFile.Reader reader =
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
assertEquals(reader.getCompressionAlgorithm().getName(), hfileoutputformatCompression);
}
} finally {
if (writer != null && context != null) {
writer.close(context);
}
dir.getFileSystem(conf).delete(dir, true);
}
}
}