HBASE-2988 Support alternate compression for major compactions
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997168 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
02cd4a9b01
commit
762e374861
|
@ -906,6 +906,7 @@ Release 0.21.0 - Unreleased
|
||||||
'IllegalArgumentException: Wrong FS'
|
'IllegalArgumentException: Wrong FS'
|
||||||
HBASE-2977 Refactor master command line to a new class
|
HBASE-2977 Refactor master command line to a new class
|
||||||
HBASE-2980 Refactor region server command line to a new class
|
HBASE-2980 Refactor region server command line to a new class
|
||||||
|
HBASE-2988 Support alternate compression for major compactions
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -72,6 +72,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final String COMPRESSION = "COMPRESSION";
|
public static final String COMPRESSION = "COMPRESSION";
|
||||||
|
public static final String COMPRESSION_COMPACT = "COMPRESSION_COMPACT";
|
||||||
public static final String BLOCKCACHE = "BLOCKCACHE";
|
public static final String BLOCKCACHE = "BLOCKCACHE";
|
||||||
public static final String BLOCKSIZE = "BLOCKSIZE";
|
public static final String BLOCKSIZE = "BLOCKSIZE";
|
||||||
public static final String LENGTH = "LENGTH";
|
public static final String LENGTH = "LENGTH";
|
||||||
|
@ -353,6 +354,19 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
||||||
/** @return compression type being used for the column family */
|
/** @return compression type being used for the column family */
|
||||||
public Compression.Algorithm getCompression() {
|
public Compression.Algorithm getCompression() {
|
||||||
String n = getValue(COMPRESSION);
|
String n = getValue(COMPRESSION);
|
||||||
|
if (n == null) {
|
||||||
|
return Compression.Algorithm.NONE;
|
||||||
|
}
|
||||||
|
return Compression.Algorithm.valueOf(n.toUpperCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @return compression type being used for the column family for major
|
||||||
|
compression */
|
||||||
|
public Compression.Algorithm getCompactionCompression() {
|
||||||
|
String n = getValue(COMPRESSION_COMPACT);
|
||||||
|
if (n == null) {
|
||||||
|
return getCompression();
|
||||||
|
}
|
||||||
return Compression.Algorithm.valueOf(n.toUpperCase());
|
return Compression.Algorithm.valueOf(n.toUpperCase());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -417,6 +431,30 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
||||||
setValue(COMPRESSION, compressionType);
|
setValue(COMPRESSION, compressionType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Compression type setting.
|
||||||
|
*/
|
||||||
|
public Compression.Algorithm getCompactionCompressionType() {
|
||||||
|
return getCompactionCompression();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compression types supported in hbase.
|
||||||
|
* LZO is not bundled as part of the hbase distribution.
|
||||||
|
* See <a href="http://wiki.apache.org/hadoop/UsingLzoCompression">LZO Compression</a>
|
||||||
|
* for how to enable it.
|
||||||
|
* @param type Compression type setting.
|
||||||
|
*/
|
||||||
|
public void setCompactionCompressionType(Compression.Algorithm type) {
|
||||||
|
String compressionType;
|
||||||
|
switch (type) {
|
||||||
|
case LZO: compressionType = "LZO"; break;
|
||||||
|
case GZ: compressionType = "GZ"; break;
|
||||||
|
default: compressionType = "NONE"; break;
|
||||||
|
}
|
||||||
|
setValue(COMPRESSION_COMPACT, compressionType);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return True if we are to keep all in use HRegionServer cache.
|
* @return True if we are to keep all in use HRegionServer cache.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -112,7 +112,10 @@ public class Store implements HeapSize {
|
||||||
private final int compactionThreshold;
|
private final int compactionThreshold;
|
||||||
private final int blocksize;
|
private final int blocksize;
|
||||||
private final boolean blockcache;
|
private final boolean blockcache;
|
||||||
|
/** Compression algorithm for flush files and minor compaction */
|
||||||
private final Compression.Algorithm compression;
|
private final Compression.Algorithm compression;
|
||||||
|
/** Compression algorithm for major compaction */
|
||||||
|
private final Compression.Algorithm compactionCompression;
|
||||||
|
|
||||||
// Comparing KeyValues
|
// Comparing KeyValues
|
||||||
final KeyValue.KVComparator comparator;
|
final KeyValue.KVComparator comparator;
|
||||||
|
@ -144,6 +147,11 @@ public class Store implements HeapSize {
|
||||||
this.blockcache = family.isBlockCacheEnabled();
|
this.blockcache = family.isBlockCacheEnabled();
|
||||||
this.blocksize = family.getBlocksize();
|
this.blocksize = family.getBlocksize();
|
||||||
this.compression = family.getCompression();
|
this.compression = family.getCompression();
|
||||||
|
// avoid overriding compression setting for major compactions if the user
|
||||||
|
// has not specified it separately
|
||||||
|
this.compactionCompression =
|
||||||
|
(family.getCompactionCompression() != Compression.Algorithm.NONE) ?
|
||||||
|
family.getCompactionCompression() : this.compression;
|
||||||
this.comparator = info.getComparator();
|
this.comparator = info.getComparator();
|
||||||
// getTimeToLive returns ttl in seconds. Convert to milliseconds.
|
// getTimeToLive returns ttl in seconds. Convert to milliseconds.
|
||||||
this.ttl = family.getTimeToLive();
|
this.ttl = family.getTimeToLive();
|
||||||
|
@ -487,12 +495,24 @@ public class Store implements HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
* @param maxKeyCount
|
||||||
* @return Writer for a new StoreFile in the tmp dir.
|
* @return Writer for a new StoreFile in the tmp dir.
|
||||||
*/
|
*/
|
||||||
private StoreFile.Writer createWriterInTmp(int maxKeyCount)
|
private StoreFile.Writer createWriterInTmp(int maxKeyCount)
|
||||||
|
throws IOException {
|
||||||
|
return createWriterInTmp(maxKeyCount, this.compression);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @param maxKeyCount
|
||||||
|
* @param compression Compression algorithm to use
|
||||||
|
* @return Writer for a new StoreFile in the tmp dir.
|
||||||
|
*/
|
||||||
|
private StoreFile.Writer createWriterInTmp(int maxKeyCount,
|
||||||
|
Compression.Algorithm compression)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
|
return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
|
||||||
this.compression, this.comparator, this.conf,
|
compression, this.comparator, this.conf,
|
||||||
this.family.getBloomFilterType(), maxKeyCount);
|
this.family.getBloomFilterType(), maxKeyCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -804,7 +824,8 @@ public class Store implements HeapSize {
|
||||||
// output to writer:
|
// output to writer:
|
||||||
for (KeyValue kv : kvs) {
|
for (KeyValue kv : kvs) {
|
||||||
if (writer == null) {
|
if (writer == null) {
|
||||||
writer = createWriterInTmp(maxKeyCount);
|
writer = createWriterInTmp(maxKeyCount,
|
||||||
|
this.compactionCompression);
|
||||||
}
|
}
|
||||||
writer.append(kv);
|
writer.append(kv);
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,7 +132,11 @@ module Hbase
|
||||||
end
|
end
|
||||||
|
|
||||||
# Add column to the table
|
# Add column to the table
|
||||||
htd.addFamily(hcd(arg))
|
descriptor = hcd(arg)
|
||||||
|
if arg[COMPRESSION_COMPACT]
|
||||||
|
descriptor.setValue(COMPRESSION_COMPACT, arg[COMPRESSION_COMPACT])
|
||||||
|
end
|
||||||
|
htd.addFamily(descriptor)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Perform the create table call
|
# Perform the create table call
|
||||||
|
@ -216,6 +220,9 @@ module Hbase
|
||||||
# No method parameter, try to use the args as a column definition
|
# No method parameter, try to use the args as a column definition
|
||||||
unless method = arg.delete(METHOD)
|
unless method = arg.delete(METHOD)
|
||||||
descriptor = hcd(arg)
|
descriptor = hcd(arg)
|
||||||
|
if arg[COMPRESSION_COMPACT]
|
||||||
|
descriptor.setValue(COMPRESSION_COMPACT, arg[COMPRESSION_COMPACT])
|
||||||
|
end
|
||||||
column_name = descriptor.getNameAsString
|
column_name = descriptor.getNameAsString
|
||||||
|
|
||||||
# If column already exist, then try to alter it. Create otherwise.
|
# If column already exist, then try to alter it. Create otherwise.
|
||||||
|
|
Loading…
Reference in New Issue