HBASE-7391 - Review/improve HLog compression's memory consumption (Ram)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1513961 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ramkrishna 2013-08-14 16:54:49 +00:00
parent e79dea8fb4
commit d21859e048
11 changed files with 99 additions and 48 deletions

View File

@ -33,7 +33,7 @@ class CompressionContext {
final Dictionary qualifierDict; final Dictionary qualifierDict;
final Dictionary rowDict; final Dictionary rowDict;
public CompressionContext(Class<? extends Dictionary> dictType) public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits)
throws SecurityException, NoSuchMethodException, InstantiationException, throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException { IllegalAccessException, InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor = Constructor<? extends Dictionary> dictConstructor =
@ -43,6 +43,17 @@ class CompressionContext {
familyDict = dictConstructor.newInstance(); familyDict = dictConstructor.newInstance();
qualifierDict = dictConstructor.newInstance(); qualifierDict = dictConstructor.newInstance();
rowDict = dictConstructor.newInstance(); rowDict = dictConstructor.newInstance();
if (recoveredEdits) {
// This will never change
regionDict.init(1);
tableDict.init(1);
} else {
regionDict.init(Short.MAX_VALUE);
tableDict.init(Short.MAX_VALUE);
}
rowDict.init(Short.MAX_VALUE);
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
} }
void clear() { void clear() {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
interface Dictionary { interface Dictionary {
byte NOT_IN_DICTIONARY = -1; byte NOT_IN_DICTIONARY = -1;
void init(int initialSize);
/** /**
* Gets an entry from the dictionary. * Gets an entry from the dictionary.
* *

View File

@ -35,13 +35,17 @@ import com.google.common.base.Preconditions;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class LRUDictionary implements Dictionary { public class LRUDictionary implements Dictionary {
private final BidirectionalLRUMap backingStore = new BidirectionalLRUMap();
BidirectionalLRUMap backingStore;
@Override @Override
public byte[] getEntry(short idx) { public byte[] getEntry(short idx) {
return backingStore.get(idx); return backingStore.get(idx);
} }
@Override
public void init(int initialSize) {
backingStore = new BidirectionalLRUMap(initialSize);
}
@Override @Override
public short findEntry(byte[] data, int offset, int length) { public short findEntry(byte[] data, int offset, int length) {
short ret = backingStore.findIdx(data, offset, length); short ret = backingStore.findIdx(data, offset, length);
@ -69,7 +73,6 @@ public class LRUDictionary implements Dictionary {
* This is not thread safe. Don't use in multi-threaded applications. * This is not thread safe. Don't use in multi-threaded applications.
*/ */
static class BidirectionalLRUMap { static class BidirectionalLRUMap {
static final int MAX_SIZE = Short.MAX_VALUE;
private int currSize = 0; private int currSize = 0;
// Head and tail of the LRU list. // Head and tail of the LRU list.
@ -77,10 +80,13 @@ public class LRUDictionary implements Dictionary {
private Node tail; private Node tail;
private HashMap<Node, Short> nodeToIndex = new HashMap<Node, Short>(); private HashMap<Node, Short> nodeToIndex = new HashMap<Node, Short>();
private Node[] indexToNode = new Node[MAX_SIZE]; private Node[] indexToNode;
private int initSize = 0;
public BidirectionalLRUMap() { public BidirectionalLRUMap(int initialSize) {
for (int i = 0; i < MAX_SIZE; i++) { initSize = initialSize;
indexToNode = new Node[initialSize];
for (int i = 0; i < initialSize; i++) {
indexToNode[i] = new Node(); indexToNode[i] = new Node();
} }
} }
@ -91,7 +97,7 @@ public class LRUDictionary implements Dictionary {
byte[] stored = new byte[length]; byte[] stored = new byte[length];
Bytes.putBytes(stored, 0, array, offset, length); Bytes.putBytes(stored, 0, array, offset, length);
if (currSize < MAX_SIZE) { if (currSize < initSize) {
// There is space to add without evicting. // There is space to add without evicting.
indexToNode[currSize].setContents(stored, 0, stored.length); indexToNode[currSize].setContents(stored, 0, stored.length);
setHead(indexToNode[currSize]); setHead(indexToNode[currSize]);
@ -174,7 +180,7 @@ public class LRUDictionary implements Dictionary {
n.container = null; n.container = null;
} }
for (int i = 0; i < MAX_SIZE; i++) { for (int i = 0; i < initSize; i++) {
indexToNode[i].next = null; indexToNode[i].next = null;
indexToNode[i].prev = null; indexToNode[i].prev = null;
} }

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
* Writer for protobuf-based WAL. * Writer for protobuf-based WAL.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ProtobufLogWriter implements HLog.Writer { public class ProtobufLogWriter extends WriterBase {
private final Log LOG = LogFactory.getLog(this.getClass()); private final Log LOG = LogFactory.getLog(this.getClass());
private FSDataOutputStream output; private FSDataOutputStream output;
private Codec.Encoder cellEncoder; private Codec.Encoder cellEncoder;
@ -50,10 +50,6 @@ public class ProtobufLogWriter implements HLog.Writer {
// than this size, it is written/read respectively, with a WARN message in the log. // than this size, it is written/read respectively, with a WARN message in the log.
private int trailerWarnSize; private int trailerWarnSize;
/** Context used by our wal dictionary compressor.
* Null if we're not to do our custom dictionary compression. */
private CompressionContext compressionContext;
public ProtobufLogWriter() { public ProtobufLogWriter() {
super(); super();
} }
@ -61,14 +57,7 @@ public class ProtobufLogWriter implements HLog.Writer {
@Override @Override
public void init(FileSystem fs, Path path, Configuration conf) throws IOException { public void init(FileSystem fs, Path path, Configuration conf) throws IOException {
assert this.output == null; assert this.output == null;
boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); boolean doCompress = initializeCompressionContext(conf, path);
if (doCompress) {
try {
this.compressionContext = new CompressionContext(LRUDictionary.class);
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
}
this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
HLog.DEFAULT_WAL_TRAILER_WARN_SIZE); HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = FSUtils.getDefaultBufferSize(fs); int bufferSize = FSUtils.getDefaultBufferSize(fs);

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class ReaderBase implements HLog.Reader { public abstract class ReaderBase implements HLog.Reader {
@ -67,7 +68,8 @@ public abstract class ReaderBase implements HLog.Reader {
// If compression is enabled, new dictionaries are created here. // If compression is enabled, new dictionaries are created here.
try { try {
if (compressionContext == null) { if (compressionContext == null) {
compressionContext = new CompressionContext(LRUDictionary.class); compressionContext = new CompressionContext(LRUDictionary.class,
FSUtils.isRecoveredEdits(path));
} else { } else {
compressionContext.clear(); compressionContext.clear();
} }

View File

@ -0,0 +1,50 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.FSUtils;
/**
* Context used by our wal dictionary compressor. Null if we're not to do our
* custom dictionary compression.
*/
@InterfaceAudience.Private
public abstract class WriterBase implements HLog.Writer {
protected CompressionContext compressionContext;
public boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
if (doCompress) {
try {
this.compressionContext = new CompressionContext(LRUDictionary.class,
FSUtils.isRecoveredEdits(path));
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
}
return doCompress;
}
}

View File

@ -1321,6 +1321,15 @@ public abstract class FSUtils {
return tabledirs; return tabledirs;
} }
/**
* Checks if the given path is the one with 'recovered.edits' dir.
* @param path
* @return
*/
public static boolean isRecoveredEdits(Path path) {
return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
}
/** /**
* Filter for all dirs that don't start with '.' * Filter for all dirs that don't start with '.'
*/ */

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
* SequenceFile.Writer. Legacy implementation only used for compat tests. * SequenceFile.Writer. Legacy implementation only used for compat tests.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class SequenceFileLogWriter implements HLog.Writer { public class SequenceFileLogWriter extends WriterBase {
private final Log LOG = LogFactory.getLog(this.getClass()); private final Log LOG = LogFactory.getLog(this.getClass());
// The sequence file we delegate to. // The sequence file we delegate to.
private SequenceFile.Writer writer; private SequenceFile.Writer writer;
@ -58,13 +58,6 @@ public class SequenceFileLogWriter implements HLog.Writer {
private static final Text WAL_VERSION_KEY = new Text("version"); private static final Text WAL_VERSION_KEY = new Text("version");
private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type"); private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary"); private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
/**
* Context used by our wal dictionary compressor. Null if we're not to do
* our custom dictionary compression. This custom WAL compression is distinct
* from sequencefile native compression.
*/
private CompressionContext compressionContext;
/** /**
* Default constructor. * Default constructor.
@ -72,7 +65,6 @@ public class SequenceFileLogWriter implements HLog.Writer {
public SequenceFileLogWriter() { public SequenceFileLogWriter() {
super(); super();
} }
/** /**
* Create sequence file Metadata for our WAL file with version and compression * Create sequence file Metadata for our WAL file with version and compression
* type (if any). * type (if any).
@ -94,19 +86,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
@Override @Override
public void init(FileSystem fs, Path path, Configuration conf) public void init(FileSystem fs, Path path, Configuration conf)
throws IOException { throws IOException {
// Should we do our custom WAL compression? boolean compress = initializeCompressionContext(conf, path);
boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
if (compress) {
try {
if (this.compressionContext == null) {
this.compressionContext = new CompressionContext(LRUDictionary.class);
} else {
this.compressionContext.clear();
}
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
}
// Create a SF.Writer instance. // Create a SF.Writer instance.
try { try {

View File

@ -74,6 +74,7 @@ public class TestCompressor {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos); DataOutputStream dos = new DataOutputStream(baos);
Dictionary dictionary = new LRUDictionary(); Dictionary dictionary = new LRUDictionary();
dictionary.init(Short.MAX_VALUE);
byte [] blahBytes = Bytes.toBytes("blah"); byte [] blahBytes = Bytes.toBytes("blah");
Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, dictionary); Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, dictionary);
dos.close(); dos.close();
@ -81,6 +82,7 @@ public class TestCompressor {
DataInputStream dis = DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(dosbytes)); new DataInputStream(new ByteArrayInputStream(dosbytes));
dictionary = new LRUDictionary(); dictionary = new LRUDictionary();
dictionary.init(Short.MAX_VALUE);
byte [] product = Compressor.readCompressed(dis, dictionary); byte [] product = Compressor.readCompressed(dis, dictionary);
assertTrue(Bytes.equals(blahBytes, product)); assertTrue(Bytes.equals(blahBytes, product));
} }

View File

@ -64,7 +64,7 @@ public class TestKeyValueCompression {
} }
private void runTestCycle(List<KeyValue> kvs) throws Exception { private void runTestCycle(List<KeyValue> kvs) throws Exception {
CompressionContext ctx = new CompressionContext(LRUDictionary.class); CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE); DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
for (KeyValue kv : kvs) { for (KeyValue kv : kvs) {
KeyValueCompression.writeKV(buf, kv, ctx); KeyValueCompression.writeKV(buf, kv, ctx);

View File

@ -41,6 +41,7 @@ public class TestLRUDictionary {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
testee = new LRUDictionary(); testee = new LRUDictionary();
testee.init(Short.MAX_VALUE);
} }
@Test @Test
@ -110,7 +111,7 @@ public class TestLRUDictionary {
@Test @Test
public void TestLRUPolicy(){ public void TestLRUPolicy(){
//start by filling the dictionary up with byte arrays //start by filling the dictionary up with byte arrays
for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { for (int i = 0; i < Short.MAX_VALUE; i++) {
testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0, testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
(BigInteger.valueOf(i)).toByteArray().length); (BigInteger.valueOf(i)).toByteArray().length);
} }
@ -132,13 +133,13 @@ public class TestLRUDictionary {
assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
BigInteger.ZERO.toByteArray().length) != -1); BigInteger.ZERO.toByteArray().length) != -1);
// Now go from beyond 1 to the end. // Now go from beyond 1 to the end.
for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { for(int i = 1; i < Short.MAX_VALUE; i++) {
assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
BigInteger.valueOf(i).toByteArray().length) == -1); BigInteger.valueOf(i).toByteArray().length) == -1);
} }
// check we can find all of these. // check we can find all of these.
for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { for (int i = 0; i < Short.MAX_VALUE; i++) {
assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
BigInteger.valueOf(i).toByteArray().length) != -1); BigInteger.valueOf(i).toByteArray().length) != -1);
} }