From 762e374861d0a8e7a0f6710f05890877f34b4b3a Mon Sep 17 00:00:00 2001 From: Andrew Kyle Purtell Date: Wed, 15 Sep 2010 02:01:56 +0000 Subject: [PATCH] HBASE-2988 Support alternate compression for major compactions git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997168 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/HColumnDescriptor.java | 38 +++++++++++++++++++ .../hadoop/hbase/regionserver/Store.java | 25 +++++++++++- src/main/ruby/hbase/admin.rb | 9 ++++- 4 files changed, 70 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 40d20704bbf..c481101c999 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -906,6 +906,7 @@ Release 0.21.0 - Unreleased 'IllegalArgumentException: Wrong FS' HBASE-2977 Refactor master command line to a new class HBASE-2980 Refactor region server command line to a new class + HBASE-2988 Support alternate compression for major compactions NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 8e3bd5361f4..7da7d7a9530 100644 --- a/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -72,6 +72,7 @@ public class HColumnDescriptor implements WritableComparable } public static final String COMPRESSION = "COMPRESSION"; + public static final String COMPRESSION_COMPACT = "COMPRESSION_COMPACT"; public static final String BLOCKCACHE = "BLOCKCACHE"; public static final String BLOCKSIZE = "BLOCKSIZE"; public static final String LENGTH = "LENGTH"; @@ -353,6 +354,19 @@ public class HColumnDescriptor implements WritableComparable /** @return compression type being used for the column family */ public Compression.Algorithm getCompression() { String n = getValue(COMPRESSION); + if (n == null) { + return Compression.Algorithm.NONE; + } + return Compression.Algorithm.valueOf(n.toUpperCase()); + } + + /** @return compression type being used for the column family for major + compression */ + public Compression.Algorithm getCompactionCompression() { + String n = getValue(COMPRESSION_COMPACT); + if (n == null) { + return getCompression(); + } return Compression.Algorithm.valueOf(n.toUpperCase()); } @@ -417,6 +431,30 @@ public class HColumnDescriptor implements WritableComparable setValue(COMPRESSION, compressionType); } + /** + * @return Compression type setting. + */ + public Compression.Algorithm getCompactionCompressionType() { + return getCompactionCompression(); + } + + /** + * Compression types supported in hbase. + * LZO is not bundled as part of the hbase distribution. + * See LZO Compression + * for how to enable it. + * @param type Compression type setting. + */ + public void setCompactionCompressionType(Compression.Algorithm type) { + String compressionType; + switch (type) { + case LZO: compressionType = "LZO"; break; + case GZ: compressionType = "GZ"; break; + default: compressionType = "NONE"; break; + } + setValue(COMPRESSION_COMPACT, compressionType); + } + /** * @return True if we are to keep all in use HRegionServer cache. */ diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 4f777f0b515..ea70619f3fe 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -112,7 +112,10 @@ public class Store implements HeapSize { private final int compactionThreshold; private final int blocksize; private final boolean blockcache; + /** Compression algorithm for flush files and minor compaction */ private final Compression.Algorithm compression; + /** Compression algorithm for major compaction */ + private final Compression.Algorithm compactionCompression; // Comparing KeyValues final KeyValue.KVComparator comparator; @@ -144,6 +147,11 @@ public class Store implements HeapSize { this.blockcache = family.isBlockCacheEnabled(); this.blocksize = family.getBlocksize(); this.compression = family.getCompression(); + // avoid overriding compression setting for major compactions if the user + // has not specified it separately + this.compactionCompression = + (family.getCompactionCompression() != Compression.Algorithm.NONE) ? + family.getCompactionCompression() : this.compression; this.comparator = info.getComparator(); // getTimeToLive returns ttl in seconds. Convert to milliseconds. this.ttl = family.getTimeToLive(); @@ -487,12 +495,24 @@ public class Store implements HeapSize { } /* + * @param maxKeyCount * @return Writer for a new StoreFile in the tmp dir. */ private StoreFile.Writer createWriterInTmp(int maxKeyCount) + throws IOException { + return createWriterInTmp(maxKeyCount, this.compression); + } + + /* + * @param maxKeyCount + * @param compression Compression algorithm to use + * @return Writer for a new StoreFile in the tmp dir. + */ + private StoreFile.Writer createWriterInTmp(int maxKeyCount, + Compression.Algorithm compression) throws IOException { return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize, - this.compression, this.comparator, this.conf, + compression, this.comparator, this.conf, this.family.getBloomFilterType(), maxKeyCount); } @@ -804,7 +824,8 @@ public class Store implements HeapSize { // output to writer: for (KeyValue kv : kvs) { if (writer == null) { - writer = createWriterInTmp(maxKeyCount); + writer = createWriterInTmp(maxKeyCount, + this.compactionCompression); } writer.append(kv); } diff --git a/src/main/ruby/hbase/admin.rb b/src/main/ruby/hbase/admin.rb index 82d7e548f2e..83765a0eec2 100644 --- a/src/main/ruby/hbase/admin.rb +++ b/src/main/ruby/hbase/admin.rb @@ -132,7 +132,11 @@ module Hbase end # Add column to the table - htd.addFamily(hcd(arg)) + descriptor = hcd(arg) + if arg[COMPRESSION_COMPACT] + descriptor.setValue(COMPRESSION_COMPACT, arg[COMPRESSION_COMPACT]) + end + htd.addFamily(descriptor) end # Perform the create table call @@ -216,6 +220,9 @@ module Hbase # No method parameter, try to use the args as a column definition unless method = arg.delete(METHOD) descriptor = hcd(arg) + if arg[COMPRESSION_COMPACT] + descriptor.setValue(COMPRESSION_COMPACT, arg[COMPRESSION_COMPACT]) + end column_name = descriptor.getNameAsString # If column already exist, then try to alter it. Create otherwise.