HBASE-9137 Add Tag dictionary in WAL compression

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1532176 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2013-10-15 03:56:36 +00:00
parent 2e5eb8bd0c
commit 16bf1c48f0
13 changed files with 341 additions and 98 deletions

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
/**
* Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
* will be used for compressing tags while writing into WALs.
*/
@InterfaceAudience.Private
public class TagCompressionContext {
private final Dictionary tagDict;
public TagCompressionContext(Class<? extends Dictionary> dictType) throws SecurityException,
NoSuchMethodException, InstantiationException, IllegalAccessException,
InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
tagDict = dictConstructor.newInstance();
tagDict.init(Short.MAX_VALUE);
}
public void clear() {
tagDict.clear();
}
/**
* Compress tags one by one and writes the OutputStream.
* @param out Stream to which the compressed tags to be written
* @param in Source where tags are available
* @param offset Offset for the tags bytes
* @param length Length of all tag bytes
* @throws IOException
*/
public void compressTags(OutputStream out, byte[] in, int offset, short length)
throws IOException {
int pos = offset;
int endOffset = pos + length;
assert pos < endOffset;
while (pos < endOffset) {
short tagLen = Bytes.toShort(in, pos);
pos += Tag.TAG_LENGTH_SIZE;
write(in, pos, tagLen, out);
pos += tagLen;
}
}
/**
* Uncompress tags from the InputStream and writes to the destination array.
* @param src Stream where the compressed tags are available
* @param dest Destination array where to write the uncompressed tags
* @param offset Offset in destination where tags to be written
* @param length Length of all tag bytes
* @throws IOException
*/
public void uncompressTags(InputStream src, byte[] dest, int offset, short length)
throws IOException {
int endOffset = offset + length;
while (offset < endOffset) {
byte status = (byte) src.read();
if (status == Dictionary.NOT_IN_DICTIONARY) {
// We are writing short as tagLen. So can downcast this without any risk.
short tagLen = (short) StreamUtils.readRawVarint32(src);
offset = Bytes.putShort(dest, offset, tagLen);
IOUtils.readFully(src, dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, (byte) src.read());
byte[] entry = tagDict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
}
offset = Bytes.putShort(dest, offset, (short) entry.length);
System.arraycopy(entry, 0, dest, offset, entry.length);
offset += entry.length;
}
}
}
private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
short dictIdx = Dictionary.NOT_IN_DICTIONARY;
if (tagDict != null) {
dictIdx = tagDict.findEntry(data, offset, length);
}
if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
out.write(Dictionary.NOT_IN_DICTIONARY);
StreamUtils.writeRawVInt32(out, length);
out.write(data, offset, length);
} else {
StreamUtils.writeShort(out, dictIdx);
}
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
package org.apache.hadoop.hbase.io.util;
import org.apache.hadoop.classification.InterfaceAudience;
@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
* first bit is reserved for detecting whether something is compressed or not).
*/
@InterfaceAudience.Private
interface Dictionary {
public interface Dictionary {
byte NOT_IN_DICTIONARY = -1;
void init(int initialSize);

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
package org.apache.hadoop.hbase.io.util;
import java.util.HashMap;
@ -30,7 +30,8 @@ import com.google.common.base.Preconditions;
* through a hashtable. Currently has max of 2^15 entries. Will start
* evicting if exceeds this number The maximum memory we expect this dictionary
* to take in the worst case is about:
* <code>(2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes (these are some big names) = ~16MB</code>.
* <code>(2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes
* (these are some big names) = ~16MB</code>.
* If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
*/
@InterfaceAudience.Private

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import com.google.common.base.Preconditions;
/*
* It seems like as soon as somebody sets himself to the task of creating VInt encoding, his mind
* blanks out for a split-second and he starts the work by wrapping it in the most convoluted
* interface he can come up with. Custom streams that allocate memory, DataOutput that is only used
* to write single bytes... We operate on simple streams. Thus, we are going to have a simple
* implementation copy-pasted from protobuf Coded*Stream.
*/
@InterfaceAudience.Private
public class StreamUtils {
public static void writeRawVInt32(OutputStream output, int value) throws IOException {
assert value >= 0;
while (true) {
if ((value & ~0x7F) == 0) {
output.write(value);
return;
} else {
output.write((value & 0x7F) | 0x80);
value >>>= 7;
}
}
}
public static int readRawVarint32(InputStream input) throws IOException {
byte tmp = (byte) input.read();
if (tmp >= 0) {
return tmp;
}
int result = tmp & 0x7f;
if ((tmp = (byte) input.read()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 0x7f) << 7;
if ((tmp = (byte) input.read()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 0x7f) << 14;
if ((tmp = (byte) input.read()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = (byte) input.read()) << 28;
if (tmp < 0) {
// Discard upper 32 bits.
for (int i = 0; i < 5; i++) {
if (input.read() >= 0) {
return result;
}
}
throw new IOException("Malformed varint");
}
}
}
}
return result;
}
public static int readRawVarint32(ByteBuffer input) throws IOException {
byte tmp = input.get();
if (tmp >= 0) {
return tmp;
}
int result = tmp & 0x7f;
if ((tmp = input.get()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 0x7f) << 7;
if ((tmp = input.get()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 0x7f) << 14;
if ((tmp = input.get()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = input.get()) << 28;
if (tmp < 0) {
// Discard upper 32 bits.
for (int i = 0; i < 5; i++) {
if (input.get() >= 0) {
return result;
}
}
throw new IOException("Malformed varint");
}
}
}
}
return result;
}
public static short toShort(byte hi, byte lo) {
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
Preconditions.checkArgument(s >= 0);
return s;
}
public static void writeShort(OutputStream out, short v) throws IOException {
Preconditions.checkArgument(v >= 0);
out.write((byte) (0xff & (v >> 8)));
out.write((byte) (0xff & v));
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
package org.apache.hadoop.hbase.io.util;
import static org.junit.Assert.*;

View File

@ -20,21 +20,31 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.util.Dictionary;
/**
* Context that holds the various dictionaries for compression in HLog.
*/
@InterfaceAudience.Private
class CompressionContext {
static final String ENABLE_WAL_TAGS_COMPRESSION =
"hbase.regionserver.wal.tags.enablecompression";
final Dictionary regionDict;
final Dictionary tableDict;
final Dictionary familyDict;
final Dictionary qualifierDict;
final Dictionary rowDict;
// Context used for compressing tags
TagCompressionContext tagCompressionContext = null;
public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits)
throws SecurityException, NoSuchMethodException, InstantiationException,
public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
Configuration conf) throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor =
dictType.getConstructor();
@ -54,6 +64,9 @@ class CompressionContext {
rowDict.init(Short.MAX_VALUE);
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
if (conf != null && conf.getBoolean(ENABLE_WAL_TAGS_COMPRESSION, true)) {
tagCompressionContext = new TagCompressionContext(dictType);
}
}
void clear() {
@ -62,5 +75,8 @@ class CompressionContext {
familyDict.clear();
qualifierDict.clear();
rowDict.clear();
if (tagCompressionContext != null) {
tagCompressionContext.clear();
}
}
}

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
@ -73,7 +74,7 @@ public abstract class ReaderBase implements HLog.Reader {
try {
if (compressionContext == null) {
compressionContext = new CompressionContext(LRUDictionary.class,
FSUtils.isRecoveredEdits(path));
FSUtils.isRecoveredEdits(path), conf);
} else {
compressionContext.clear();
}

View File

@ -29,11 +29,12 @@ import org.apache.hadoop.hbase.codec.BaseDecoder;
import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@ -157,7 +158,8 @@ public class WALCellCodec implements Codec {
StreamUtils.writeRawVInt32(out, kv.getKeyLength());
StreamUtils.writeRawVInt32(out, kv.getValueLength());
// To support tags
StreamUtils.writeRawVInt32(out, kv.getTagsLength());
short tagsLength = kv.getTagsLength();
StreamUtils.writeRawVInt32(out, tagsLength);
// Write row, qualifier, and family; use dictionary
// compression as they're likely to have duplicates.
@ -165,11 +167,25 @@ public class WALCellCodec implements Codec {
write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
// Write the rest uncompressed.
// Write timestamp, type and value as uncompressed.
int pos = kv.getTimestampOffset();
int remainingLength = kv.getLength() + offset - pos;
out.write(kvBuffer, pos, remainingLength);
int tsTypeValLen = kv.getLength() + offset - pos;
if (tagsLength > 0) {
tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
}
assert tsTypeValLen > 0;
out.write(kvBuffer, pos, tsTypeValLen);
if (tagsLength > 0) {
if (compression.tagCompressionContext != null) {
// Write tags using Dictionary compression
compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
tagsLength);
} else {
// Tag compression is disabled within the WAL compression. Just write the tags bytes as
// it is.
out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
}
}
}
private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
@ -199,7 +215,7 @@ public class WALCellCodec implements Codec {
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);
int tagsLength = StreamUtils.readRawVarint32(in);
short tagsLength = (short) StreamUtils.readRawVarint32(in);
int length = 0;
if(tagsLength == 0) {
length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
@ -228,8 +244,23 @@ public class WALCellCodec implements Codec {
elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
pos += elemLen;
// the rest
IOUtils.readFully(in, backingArray, pos, length - pos);
// timestamp, type and value
int tsTypeValLen = length - pos;
if (tagsLength > 0) {
tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
}
IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
pos += tsTypeValLen;
// tags
if (tagsLength > 0) {
pos = Bytes.putShort(backingArray, pos, tagsLength);
if (compression.tagCompressionContext != null) {
compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
} else {
IOUtils.readFully(in, backingArray, pos, tagsLength);
}
}
return new KeyValue(backingArray, 0, length);
}
@ -294,80 +325,4 @@ public class WALCellCodec implements Codec {
// TODO: ideally this should also encapsulate compressionContext
return this.statelessUncompressor;
}
/**
* It seems like as soon as somebody sets himself to the task of creating VInt encoding,
* his mind blanks out for a split-second and he starts the work by wrapping it in the
* most convoluted interface he can come up with. Custom streams that allocate memory,
* DataOutput that is only used to write single bytes... We operate on simple streams.
* Thus, we are going to have a simple implementation copy-pasted from protobuf Coded*Stream.
*/
private static class StreamUtils {
public static int computeRawVarint32Size(final int value) {
if ((value & (0xffffffff << 7)) == 0) return 1;
if ((value & (0xffffffff << 14)) == 0) return 2;
if ((value & (0xffffffff << 21)) == 0) return 3;
if ((value & (0xffffffff << 28)) == 0) return 4;
return 5;
}
static void writeRawVInt32(OutputStream output, int value) throws IOException {
assert value >= 0;
while (true) {
if ((value & ~0x7F) == 0) {
output.write(value);
return;
} else {
output.write((value & 0x7F) | 0x80);
value >>>= 7;
}
}
}
static int readRawVarint32(InputStream input) throws IOException {
byte tmp = (byte)input.read();
if (tmp >= 0) {
return tmp;
}
int result = tmp & 0x7f;
if ((tmp = (byte)input.read()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 0x7f) << 7;
if ((tmp = (byte)input.read()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 0x7f) << 14;
if ((tmp = (byte)input.read()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = (byte)input.read()) << 28;
if (tmp < 0) {
// Discard upper 32 bits.
for (int i = 0; i < 5; i++) {
if (input.read() >= 0) {
return result;
}
}
throw new IOException("Malformed varint");
}
}
}
}
return result;
}
static short toShort(byte hi, byte lo) {
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
Preconditions.checkArgument(s >= 0);
return s;
}
static void writeShort(OutputStream out, short v) throws IOException {
Preconditions.checkArgument(v >= 0);
out.write((byte)(0xff & (v >> 8)));
out.write((byte)(0xff & v));
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.FSUtils;
/**
@ -40,7 +41,7 @@ public abstract class WriterBase implements HLog.Writer {
if (doCompress) {
try {
this.compressionContext = new CompressionContext(LRUDictionary.class,
FSUtils.isRecoveredEdits(path));
FSUtils.isRecoveredEdits(path), conf);
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}

View File

@ -28,6 +28,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.Test;
@ -67,7 +68,7 @@ public class TestKeyValueCompression {
}
private void runTestCycle(List<KeyValue> kvs) throws Exception {
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
for (KeyValue kv : kvs) {
KeyValueCompression.writeKV(buf, kv, ctx);
@ -84,7 +85,7 @@ public class TestKeyValueCompression {
@Test
public void testKVWithTags() throws Exception {
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
KeyValueCompression.writeKV(buf, createKV(1), ctx);
KeyValueCompression.writeKV(buf, createKV(0), ctx);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.codec.Codec.Decoder;
import org.apache.hadoop.hbase.codec.Codec.Encoder;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -42,8 +43,19 @@ public class TestWALCellCodecWithCompression {
@Test
public void testEncodeDecodeKVsWithTags() throws Exception {
WALCellCodec codec = new WALCellCodec(new Configuration(false), new CompressionContext(
LRUDictionary.class, false));
doTest(false);
}
@Test
public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
doTest(true);
}
private void doTest(boolean compressTags) throws Exception {
Configuration conf = new Configuration(false);
conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
conf));
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
encoder.write(createKV(1));