HBASE-3474 HFileOutputFormat to use column family's compression algorithm

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1085179 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-03-24 22:59:45 +00:00
parent a344cd98b7
commit 4c59785135
4 changed files with 295 additions and 11 deletions

View File

@ -97,6 +97,7 @@ Release 0.91.0 - Unreleased
number of maps
HBASE-3673 Reduce HTable Pool Contention Using Concurrent Collections
(Karthick Sankarachary via Stack)
HBASE-3474 HFileOutputFormat to use column family's compression algorithm
TASK
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -1216,6 +1216,10 @@ public class HFile {
return this.comparator;
}
public Compression.Algorithm getCompressionAlgorithm() {
return this.compressAlgo;
}
/**
* @return index size
*/

View File

@ -20,9 +20,13 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -32,7 +36,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@ -64,6 +70,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
throws IOException, InterruptedException {
@ -78,9 +85,12 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
final int blocksize = conf.getInt("hfile.min.blocksize.size",
HFile.DEFAULT_BLOCKSIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
final String compression = conf.get("hfile.compression",
final String defaultCompression = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
// create a map from column family to the compression algorithm
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
// Map of families to writers and how much has been output on the writer.
private final Map<byte [], WriterLength> writers =
@ -153,6 +163,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
private WriterLength getNewWriter(byte[] family) throws IOException {
WriterLength wl = new WriterLength();
Path familydir = new Path(outputdir, Bytes.toString(family));
String compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression;
wl.writer = new HFile.Writer(fs,
StoreFile.getUniqueFile(fs, familydir), blocksize,
compression, KeyValue.KEY_COMPARATOR);
@ -300,7 +312,69 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
DistributedCache.addCacheFile(cacheUri, conf);
DistributedCache.createSymlink(conf);
// Set compression algorithms based on column families
configureCompression(table, conf);
LOG.info("Incremental table output configured.");
}
/**
* Run inside the task to deserialize column family to compression algorithm
* map from the
* configuration.
*
* Package-private for unit tests only.
*
* @return a map from column family to the name of the configured compression
* algorithm
*/
static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
for (String familyConf : compressionConf.split("&")) {
String[] familySplit = familyConf.split("=");
if (familySplit.length != 2) {
continue;
}
try {
compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
URLDecoder.decode(familySplit[1], "UTF-8"));
} catch (UnsupportedEncodingException e) {
// will not happen with UTF-8 encoding
throw new AssertionError(e);
}
}
return compressionMap;
}
/**
* Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.
*
* Package-private for unit tests only.
*
* @throws IOException
* on failure to read column family descriptors
*/
static void configureCompression(HTable table, Configuration conf) throws IOException {
StringBuilder compressionConfigValue = new StringBuilder();
HTableDescriptor tableDescriptor = table.getTableDescriptor();
if(tableDescriptor == null){
// could happen with mock table instance
return;
}
Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
int i = 0;
for (HColumnDescriptor familyDescriptor : families) {
if (i++ > 0) {
compressionConfigValue.append('&');
}
compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
compressionConfigValue.append('=');
compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
}
// Get rid of the last ampersand
conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
}
}

View File

@ -23,9 +23,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -36,7 +41,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -45,6 +52,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.NullWritable;
@ -58,6 +69,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
/**
* Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
* Sets up and runs a mapreduce job that writes hfile output.
@ -232,14 +245,7 @@ public class TestHFileOutputFormat {
public void testJobConfiguration() throws Exception {
Job job = new Job();
HTable table = Mockito.mock(HTable.class);
byte[][] mockKeys = new byte[][] {
HConstants.EMPTY_BYTE_ARRAY,
Bytes.toBytes("aaa"),
Bytes.toBytes("ggg"),
Bytes.toBytes("zzz")
};
Mockito.doReturn(mockKeys).when(table).getStartKeys();
setupMockStartKeys(table);
HFileOutputFormat.configureIncrementalLoad(job, table);
assertEquals(job.getNumReduceTasks(), 4);
}
@ -372,6 +378,205 @@ public class TestHFileOutputFormat {
assertTrue(job.waitForCompletion(true));
}
/**
* Test for
* {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests
* that the compression map is correctly deserialized from configuration
*
* @throws IOException
*/
@Test
public void testCreateFamilyCompressionMap() throws IOException {
for (int numCfs = 0; numCfs <= 3; numCfs++) {
Configuration conf = new Configuration(this.util.getConfiguration());
Map<String, Compression.Algorithm> familyToCompression = getMockColumnFamilies(numCfs);
HTable table = Mockito.mock(HTable.class);
setupMockColumnFamilies(table, familyToCompression);
HFileOutputFormat.configureCompression(table, conf);
// read back family specific compression setting from the configuration
Map<byte[], String> retrievedFamilyToCompressionMap = HFileOutputFormat.createFamilyCompressionMap(conf);
// test that we have a value for all column families that matches with the
// used mock values
for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue()
.getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
}
}
}
private void setupMockColumnFamilies(HTable table,
Map<String, Compression.Algorithm> familyToCompression) throws IOException
{
HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey().getBytes(), 1, entry.getValue().getName(),
false, false, 0, "none"));
}
Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
}
private void setupMockStartKeys(HTable table) throws IOException {
byte[][] mockKeys = new byte[][] {
HConstants.EMPTY_BYTE_ARRAY,
Bytes.toBytes("aaa"),
Bytes.toBytes("ggg"),
Bytes.toBytes("zzz")
};
Mockito.doReturn(mockKeys).when(table).getStartKeys();
}
/**
* @return a map from column family names to compression algorithms for
* testing column family compression. Column family names have special characters
*/
private Map<String, Compression.Algorithm> getMockColumnFamilies(int numCfs) {
Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
// use column family names having special characters
if (numCfs-- > 0) {
familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
}
if (numCfs-- > 0) {
familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
}
if (numCfs-- > 0) {
familyToCompression.put("Family3", Compression.Algorithm.NONE);
}
return familyToCompression;
}
/**
* Test that {@link HFileOutputFormat} RecordWriter uses compression settings
* from the column family descriptor
*/
@Test
public void testColumnFamilyCompression()
throws IOException, InterruptedException {
Configuration conf = new Configuration(this.util.getConfiguration());
RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
TaskAttemptContext context = null;
Path dir =
HBaseTestingUtility.getTestDir("testColumnFamilyCompression");
HTable table = Mockito.mock(HTable.class);
Map<String, Compression.Algorithm> configuredCompression =
new HashMap<String, Compression.Algorithm>();
Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms();
int familyIndex = 0;
for (byte[] family : FAMILIES) {
configuredCompression.put(Bytes.toString(family),
supportedAlgos[familyIndex++ % supportedAlgos.length]);
}
setupMockColumnFamilies(table, configuredCompression);
// set up the table to return some mock keys
setupMockStartKeys(table);
try {
// partial map red setup to get an operational writer for testing
Job job = new Job(conf, "testLocalMRIncrementalLoad");
setupRandomGeneratorMapper(job);
HFileOutputFormat.configureIncrementalLoad(job, table);
FileOutputFormat.setOutputPath(job, dir);
context = new TaskAttemptContext(job.getConfiguration(),
new TaskAttemptID());
HFileOutputFormat hof = new HFileOutputFormat();
writer = hof.getRecordWriter(context);
// write out random rows
writeRandomKeyValues(writer, context, ROWSPERSPLIT);
writer.close(context);
// Make sure that a directory was created for every CF
FileSystem fileSystem = dir.getFileSystem(conf);
// commit so that the filesystem has one directory per column family
hof.getOutputCommitter(context).commitTask(context);
for (byte[] family : FAMILIES) {
String familyStr = new String(family);
boolean found = false;
for (FileStatus f : fileSystem.listStatus(dir)) {
if (Bytes.toString(family).equals(f.getPath().getName())) {
// we found a matching directory
found = true;
// verify that the compression on this file matches the configured
// compression
Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath();
Reader reader = new HFile.Reader(fileSystem, dataFilePath, null, false, true);
reader.loadFileInfo();
assertEquals("Incorrect compression used for column family " + familyStr
+ "(reader: " + reader + ")",
configuredCompression.get(familyStr), reader.getCompressionAlgorithm());
break;
}
}
if (!found) {
fail("HFile for column family " + familyStr + " not found");
}
}
} finally {
dir.getFileSystem(conf).delete(dir, true);
}
}
/**
* @return
*/
private Compression.Algorithm[] getSupportedCompressionAlgorithms() {
String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
List<Compression.Algorithm> supportedAlgos = Lists.newArrayList();
for (String algoName : allAlgos) {
try {
Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
algo.getCompressor();
supportedAlgos.add(algo);
}catch (Exception e) {
// this algo is not available
}
}
return supportedAlgos.toArray(new Compression.Algorithm[0]);
}
/**
* Write random values to the writer assuming a table created using
* {@link #FAMILIES} as column family descriptors
*/
private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer, TaskAttemptContext context,
int numRows)
throws IOException, InterruptedException {
byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
int valLength = 10;
byte valBytes[] = new byte[valLength];
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
for (int i = 0; i < numRows; i++) {
Bytes.putInt(keyBytes, 0, i);
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat.FAMILIES) {
KeyValue kv = new KeyValue(keyBytes, family,
PerformanceEvaluation.QUALIFIER_NAME, valBytes);
writer.write(key, kv);
}
}
}
public static void main(String args[]) throws Exception {
new TestHFileOutputFormat().manualTest(args);
}