diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 5a877af09fa..a76cafac959 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -33,7 +33,7 @@ class CompressionContext { final Dictionary qualifierDict; final Dictionary rowDict; - public CompressionContext(Class dictType) + public CompressionContext(Class dictType, boolean recoveredEdits) throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { Constructor dictConstructor = @@ -43,6 +43,17 @@ class CompressionContext { familyDict = dictConstructor.newInstance(); qualifierDict = dictConstructor.newInstance(); rowDict = dictConstructor.newInstance(); + if (recoveredEdits) { + // This will never change + regionDict.init(1); + tableDict.init(1); + } else { + regionDict.init(Short.MAX_VALUE); + tableDict.init(Short.MAX_VALUE); + } + rowDict.init(Short.MAX_VALUE); + familyDict.init(Byte.MAX_VALUE); + qualifierDict.init(Byte.MAX_VALUE); } void clear() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java index e99820ae19f..fd1c2645d74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience; interface Dictionary { byte NOT_IN_DICTIONARY = -1; + void init(int initialSize); /** * Gets an entry from the dictionary. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java index 6e0b20bd65d..1bea4470e0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java @@ -35,13 +35,17 @@ import com.google.common.base.Preconditions; */ @InterfaceAudience.Private public class LRUDictionary implements Dictionary { - private final BidirectionalLRUMap backingStore = new BidirectionalLRUMap(); + BidirectionalLRUMap backingStore; @Override public byte[] getEntry(short idx) { return backingStore.get(idx); } + @Override + public void init(int initialSize) { + backingStore = new BidirectionalLRUMap(initialSize); + } @Override public short findEntry(byte[] data, int offset, int length) { short ret = backingStore.findIdx(data, offset, length); @@ -69,7 +73,6 @@ public class LRUDictionary implements Dictionary { * This is not thread safe. Don't use in multi-threaded applications. */ static class BidirectionalLRUMap { - static final int MAX_SIZE = Short.MAX_VALUE; private int currSize = 0; // Head and tail of the LRU list. @@ -77,10 +80,13 @@ public class LRUDictionary implements Dictionary { private Node tail; private HashMap nodeToIndex = new HashMap(); - private Node[] indexToNode = new Node[MAX_SIZE]; + private Node[] indexToNode; + private int initSize = 0; - public BidirectionalLRUMap() { - for (int i = 0; i < MAX_SIZE; i++) { + public BidirectionalLRUMap(int initialSize) { + initSize = initialSize; + indexToNode = new Node[initialSize]; + for (int i = 0; i < initialSize; i++) { indexToNode[i] = new Node(); } } @@ -91,7 +97,7 @@ public class LRUDictionary implements Dictionary { byte[] stored = new byte[length]; Bytes.putBytes(stored, 0, array, offset, length); - if (currSize < MAX_SIZE) { + if (currSize < initSize) { // There is space to add without evicting. indexToNode[currSize].setContents(stored, 0, stored.length); setHead(indexToNode[currSize]); @@ -174,7 +180,7 @@ public class LRUDictionary implements Dictionary { n.container = null; } - for (int i = 0; i < MAX_SIZE; i++) { + for (int i = 0; i < initSize; i++) { indexToNode[i].next = null; indexToNode[i].prev = null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index c5521df5f1a..1174847e9a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; * Writer for protobuf-based WAL. */ @InterfaceAudience.Private -public class ProtobufLogWriter implements HLog.Writer { +public class ProtobufLogWriter extends WriterBase { private final Log LOG = LogFactory.getLog(this.getClass()); private FSDataOutputStream output; private Codec.Encoder cellEncoder; @@ -50,10 +50,6 @@ public class ProtobufLogWriter implements HLog.Writer { // than this size, it is written/read respectively, with a WARN message in the log. private int trailerWarnSize; - /** Context used by our wal dictionary compressor. - * Null if we're not to do our custom dictionary compression. */ - private CompressionContext compressionContext; - public ProtobufLogWriter() { super(); } @@ -61,14 +57,7 @@ public class ProtobufLogWriter implements HLog.Writer { @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { assert this.output == null; - boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); - if (doCompress) { - try { - this.compressionContext = new CompressionContext(LRUDictionary.class); - } catch (Exception e) { - throw new IOException("Failed to initiate CompressionContext", e); - } - } + boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, HLog.DEFAULT_WAL_TRAILER_WARN_SIZE); int bufferSize = FSUtils.getDefaultBufferSize(fs); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index dab80fea28f..2b9337b77a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.FSUtils; @InterfaceAudience.Private public abstract class ReaderBase implements HLog.Reader { @@ -67,7 +68,8 @@ public abstract class ReaderBase implements HLog.Reader { // If compression is enabled, new dictionaries are created here. try { if (compressionContext == null) { - compressionContext = new CompressionContext(LRUDictionary.class); + compressionContext = new CompressionContext(LRUDictionary.class, + FSUtils.isRecoveredEdits(path)); } else { compressionContext.clear(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java new file mode 100644 index 00000000000..2d43c9bf120 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java @@ -0,0 +1,50 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * Context used by our wal dictionary compressor. Null if we're not to do our + * custom dictionary compression. + */ +@InterfaceAudience.Private +public abstract class WriterBase implements HLog.Writer { + + protected CompressionContext compressionContext; + + public boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { + boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + if (doCompress) { + try { + this.compressionContext = new CompressionContext(LRUDictionary.class, + FSUtils.isRecoveredEdits(path)); + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } + return doCompress; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index c7e9281a5fa..e74fdb7eadf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -1321,6 +1321,15 @@ public abstract class FSUtils { return tabledirs; } + /** + * Checks if the given path is the one with 'recovered.edits' dir. + * @param path + * @return + */ + public static boolean isRecoveredEdits(Path path) { + return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); + } + /** * Filter for all dirs that don't start with '.' */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 672446e6eac..d7701b77c5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -46,7 +46,7 @@ import org.apache.hadoop.io.compress.DefaultCodec; * SequenceFile.Writer. Legacy implementation only used for compat tests. */ @InterfaceAudience.Private -public class SequenceFileLogWriter implements HLog.Writer { +public class SequenceFileLogWriter extends WriterBase { private final Log LOG = LogFactory.getLog(this.getClass()); // The sequence file we delegate to. private SequenceFile.Writer writer; @@ -58,13 +58,6 @@ public class SequenceFileLogWriter implements HLog.Writer { private static final Text WAL_VERSION_KEY = new Text("version"); private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type"); private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary"); - - /** - * Context used by our wal dictionary compressor. Null if we're not to do - * our custom dictionary compression. This custom WAL compression is distinct - * from sequencefile native compression. - */ - private CompressionContext compressionContext; /** * Default constructor. @@ -72,7 +65,6 @@ public class SequenceFileLogWriter implements HLog.Writer { public SequenceFileLogWriter() { super(); } - /** * Create sequence file Metadata for our WAL file with version and compression * type (if any). @@ -94,19 +86,7 @@ public class SequenceFileLogWriter implements HLog.Writer { @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { - // Should we do our custom WAL compression? - boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); - if (compress) { - try { - if (this.compressionContext == null) { - this.compressionContext = new CompressionContext(LRUDictionary.class); - } else { - this.compressionContext.clear(); - } - } catch (Exception e) { - throw new IOException("Failed to initiate CompressionContext", e); - } - } + boolean compress = initializeCompressionContext(conf, path); // Create a SF.Writer instance. try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java index f4ae5fa997f..06f0df5c99f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java @@ -74,6 +74,7 @@ public class TestCompressor { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); Dictionary dictionary = new LRUDictionary(); + dictionary.init(Short.MAX_VALUE); byte [] blahBytes = Bytes.toBytes("blah"); Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, dictionary); dos.close(); @@ -81,6 +82,7 @@ public class TestCompressor { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dosbytes)); dictionary = new LRUDictionary(); + dictionary.init(Short.MAX_VALUE); byte [] product = Compressor.readCompressed(dis, dictionary); assertTrue(Bytes.equals(blahBytes, product)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java index 8fa7fe883b6..ba570565032 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java @@ -64,7 +64,7 @@ public class TestKeyValueCompression { } private void runTestCycle(List kvs) throws Exception { - CompressionContext ctx = new CompressionContext(LRUDictionary.class); + CompressionContext ctx = new CompressionContext(LRUDictionary.class, false); DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE); for (KeyValue kv : kvs) { KeyValueCompression.writeKV(buf, kv, ctx); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java index 99983a2f5cc..a66982358d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java @@ -41,6 +41,7 @@ public class TestLRUDictionary { @Before public void setUp() throws Exception { testee = new LRUDictionary(); + testee.init(Short.MAX_VALUE); } @Test @@ -110,7 +111,7 @@ public class TestLRUDictionary { @Test public void TestLRUPolicy(){ //start by filling the dictionary up with byte arrays - for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { + for (int i = 0; i < Short.MAX_VALUE; i++) { testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0, (BigInteger.valueOf(i)).toByteArray().length); } @@ -132,13 +133,13 @@ public class TestLRUDictionary { assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, BigInteger.ZERO.toByteArray().length) != -1); // Now go from beyond 1 to the end. - for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { + for(int i = 1; i < Short.MAX_VALUE; i++) { assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, BigInteger.valueOf(i).toByteArray().length) == -1); } // check we can find all of these. - for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { + for (int i = 0; i < Short.MAX_VALUE; i++) { assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, BigInteger.valueOf(i).toByteArray().length) != -1); }