HBASE-9045 Support Dictionary based Tag compression in HFiles
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1537377 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
94aa409410
commit
68db456397
|
@ -94,6 +94,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
|||
public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
|
||||
public static final String MIN_VERSIONS = "MIN_VERSIONS";
|
||||
public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS";
|
||||
public static final String COMPRESS_TAGS = "COMPRESS_TAGS";
|
||||
|
||||
/**
|
||||
* Default compression type.
|
||||
|
@ -187,6 +188,11 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
|||
*/
|
||||
public static final boolean DEFAULT_EVICT_BLOCKS_ON_CLOSE = false;
|
||||
|
||||
/**
|
||||
* Default compress tags along with any type of DataBlockEncoding
|
||||
*/
|
||||
public static final boolean DEFAULT_COMPRESS_TAGS = true;
|
||||
|
||||
private final static Map<String, String> DEFAULT_VALUES
|
||||
= new HashMap<String, String>();
|
||||
private final static Set<ImmutableBytesWritable> RESERVED_KEYWORDS
|
||||
|
@ -674,6 +680,30 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
|
|||
return setValue(DATA_BLOCK_ENCODING, name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set whether the tags should be compressed along with DataBlockEncoding. When no
|
||||
* DataBlockEncoding is been used, this is having no effect.
|
||||
*
|
||||
* @param compressTags
|
||||
* @return this (for chained invocation)
|
||||
*/
|
||||
public HColumnDescriptor setCompressTags(boolean compressTags) {
|
||||
return setValue(COMPRESS_TAGS, String.valueOf(compressTags));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Whether KV tags should be compressed along with DataBlockEncoding. When no
|
||||
* DataBlockEncoding is been used, this is having no effect.
|
||||
*/
|
||||
public boolean shouldCompressTags() {
|
||||
String compressTagsStr = getValue(COMPRESS_TAGS);
|
||||
boolean compressTags = DEFAULT_COMPRESS_TAGS;
|
||||
if (compressTagsStr != null) {
|
||||
compressTags = Boolean.valueOf(compressTagsStr);
|
||||
}
|
||||
return compressTags;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Compression type setting.
|
||||
*/
|
||||
|
|
|
@ -23,17 +23,19 @@ import java.io.InputStream;
|
|||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.io.util.Dictionary;
|
||||
import org.apache.hadoop.hbase.io.util.StreamUtils;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
/**
|
||||
* Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
|
||||
* will be used for compressing tags while writing into WALs.
|
||||
* will be used for compressing tags while writing into HFiles and WALs.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class TagCompressionContext {
|
||||
|
@ -52,7 +54,7 @@ public class TagCompressionContext {
|
|||
}
|
||||
|
||||
/**
|
||||
* Compress tags one by one and writes the OutputStream.
|
||||
* Compress tags one by one and writes to the OutputStream.
|
||||
* @param out Stream to which the compressed tags to be written
|
||||
* @param in Source where tags are available
|
||||
* @param offset Offset for the tags bytes
|
||||
|
@ -72,6 +74,24 @@ public class TagCompressionContext {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compress tags one by one and writes to the OutputStream.
|
||||
* @param out Stream to which the compressed tags to be written
|
||||
* @param in Source buffer where tags are available
|
||||
* @param length Length of all tag bytes
|
||||
* @throws IOException
|
||||
*/
|
||||
public void compressTags(OutputStream out, ByteBuffer in, short length) throws IOException {
|
||||
if (in.hasArray()) {
|
||||
compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
|
||||
ByteBufferUtils.skip(in, length);
|
||||
} else {
|
||||
byte[] tagBuf = new byte[length];
|
||||
in.get(tagBuf);
|
||||
compressTags(out, tagBuf, 0, length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Uncompress tags from the InputStream and writes to the destination array.
|
||||
* @param src Stream where the compressed tags are available
|
||||
|
@ -105,6 +125,58 @@ public class TagCompressionContext {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Uncompress tags from the input ByteBuffer and writes to the destination array.
|
||||
* @param src Buffer where the compressed tags are available
|
||||
* @param dest Destination array where to write the uncompressed tags
|
||||
* @param offset Offset in destination where tags to be written
|
||||
* @param length Length of all tag bytes
|
||||
* @throws IOException
|
||||
*/
|
||||
public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
|
||||
throws IOException {
|
||||
int endOffset = offset + length;
|
||||
while (offset < endOffset) {
|
||||
byte status = src.get();
|
||||
short tagLen;
|
||||
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
||||
// We are writing short as tagLen. So can downcast this without any risk.
|
||||
tagLen = (short) StreamUtils.readRawVarint32(src);
|
||||
offset = Bytes.putShort(dest, offset, tagLen);
|
||||
src.get(dest, offset, tagLen);
|
||||
tagDict.addEntry(dest, offset, tagLen);
|
||||
offset += tagLen;
|
||||
} else {
|
||||
short dictIdx = StreamUtils.toShort(status, src.get());
|
||||
byte[] entry = tagDict.getEntry(dictIdx);
|
||||
if (entry == null) {
|
||||
throw new IOException("Missing dictionary entry for index " + dictIdx);
|
||||
}
|
||||
tagLen = (short) entry.length;
|
||||
offset = Bytes.putShort(dest, offset, tagLen);
|
||||
System.arraycopy(entry, 0, dest, offset, tagLen);
|
||||
offset += tagLen;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Uncompress tags from the InputStream and writes to the destination buffer.
|
||||
* @param src Stream where the compressed tags are available
|
||||
* @param dest Destination buffer where to write the uncompressed tags
|
||||
* @param length Length of all tag bytes
|
||||
* @throws IOException
|
||||
*/
|
||||
public void uncompressTags(InputStream src, ByteBuffer dest, short length) throws IOException {
|
||||
if (dest.hasArray()) {
|
||||
uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
|
||||
} else {
|
||||
byte[] tagBuf = new byte[length];
|
||||
uncompressTags(src, tagBuf, 0, length);
|
||||
dest.put(tagBuf);
|
||||
}
|
||||
}
|
||||
|
||||
private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
|
||||
short dictIdx = Dictionary.NOT_IN_DICTIONARY;
|
||||
if (tagDict != null) {
|
||||
|
|
|
@ -26,8 +26,10 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
|
||||
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.util.LRUDictionary;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
@ -50,6 +52,14 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
|
||||
HFileBlockDefaultDecodingContext decodingCtx =
|
||||
(HFileBlockDefaultDecodingContext) blkDecodingCtx;
|
||||
if (decodingCtx.getHFileContext().shouldCompressTags()) {
|
||||
try {
|
||||
TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
|
||||
decodingCtx.setTagCompressionContext(tagCompressionContext);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed to initialize TagCompressionContext", e);
|
||||
}
|
||||
}
|
||||
return internalDecodeKeyValues(source, 0, 0, decodingCtx);
|
||||
}
|
||||
|
||||
|
@ -58,11 +68,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
protected int keyLength;
|
||||
protected int valueLength;
|
||||
protected int lastCommonPrefix;
|
||||
protected int tagLength = 0;
|
||||
protected int tagOffset = -1;
|
||||
protected int tagsLength = 0;
|
||||
protected int tagsOffset = -1;
|
||||
|
||||
/** We need to store a copy of the key. */
|
||||
protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
|
||||
protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
|
||||
|
||||
protected long memstoreTS;
|
||||
protected int nextKvOffset;
|
||||
|
@ -88,6 +99,19 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
}
|
||||
}
|
||||
|
||||
protected void ensureSpaceForTags() {
|
||||
if (tagsLength > tagsBuffer.length) {
|
||||
// rare case, but we need to handle arbitrary length of tags
|
||||
int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
|
||||
while (tagsLength > newTagsBufferLength) {
|
||||
newTagsBufferLength *= 2;
|
||||
}
|
||||
byte[] newTagsBuffer = new byte[newTagsBufferLength];
|
||||
System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
|
||||
tagsBuffer = newTagsBuffer;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy the state from the next one into this instance (the previous state
|
||||
* placeholder). Used to save the previous state when we are advancing the
|
||||
|
@ -127,6 +151,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
protected ByteBuffer currentBuffer;
|
||||
protected STATE current = createSeekerState(); // always valid
|
||||
protected STATE previous = createSeekerState(); // may not be valid
|
||||
protected TagCompressionContext tagCompressionContext = null;
|
||||
|
||||
public BufferedEncodedSeeker(KVComparator comparator,
|
||||
HFileBlockDecodingContext decodingCtx) {
|
||||
|
@ -137,6 +162,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
this.samePrefixComparator = null;
|
||||
}
|
||||
this.decodingCtx = decodingCtx;
|
||||
if (decodingCtx.getHFileContext().shouldCompressTags()) {
|
||||
try {
|
||||
tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to initialize TagCompressionContext", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean includesMvcc() {
|
||||
|
@ -183,17 +215,25 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
kvBuffer.put(currentBuffer.array(),
|
||||
currentBuffer.arrayOffset() + current.valueOffset,
|
||||
current.valueLength);
|
||||
if (current.tagLength > 0) {
|
||||
kvBuffer.putShort((short) current.tagLength);
|
||||
kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagOffset,
|
||||
current.tagLength);
|
||||
if (current.tagsLength > 0) {
|
||||
kvBuffer.putShort((short) current.tagsLength);
|
||||
if (current.tagsOffset != -1) {
|
||||
// the offset of the tags bytes in the underlying buffer is marked. So the temp
|
||||
// buffer,tagsBuffer was not been used.
|
||||
kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
|
||||
current.tagsLength);
|
||||
} else {
|
||||
// When tagsOffset is marked as -1, tag compression was present and so the tags were
|
||||
// uncompressed into temp buffer, tagsBuffer. Let us copy it from there
|
||||
kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
|
||||
}
|
||||
}
|
||||
return kvBuffer;
|
||||
}
|
||||
|
||||
protected ByteBuffer createKVBuffer() {
|
||||
int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
|
||||
current.valueLength, current.tagLength);
|
||||
current.valueLength, current.tagsLength);
|
||||
ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
|
||||
return kvBuffer;
|
||||
}
|
||||
|
@ -225,9 +265,23 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
}
|
||||
|
||||
public void decodeTags() {
|
||||
current.tagLength = ByteBufferUtils.readCompressedInt(currentBuffer);
|
||||
current.tagOffset = currentBuffer.position();
|
||||
ByteBufferUtils.skip(currentBuffer, current.tagLength);
|
||||
current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
|
||||
if (tagCompressionContext != null) {
|
||||
// Tag compression is been used. uncompress it into tagsBuffer
|
||||
current.ensureSpaceForTags();
|
||||
try {
|
||||
tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0,
|
||||
current.tagsLength);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Exception while uncompressing tags", e);
|
||||
}
|
||||
current.tagsOffset = -1;
|
||||
} else {
|
||||
// When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
|
||||
// Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
|
||||
current.tagsOffset = currentBuffer.position();
|
||||
ByteBufferUtils.skip(currentBuffer, current.tagsLength);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -320,9 +374,19 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
protected final void afterEncodingKeyValue(ByteBuffer in,
|
||||
DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
|
||||
if (encodingCtx.getHFileContext().shouldIncludeTags()) {
|
||||
int tagsLength = in.getShort();
|
||||
short tagsLength = in.getShort();
|
||||
ByteBufferUtils.putCompressedInt(out, tagsLength);
|
||||
ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
|
||||
// There are some tags to be written
|
||||
if (tagsLength > 0) {
|
||||
TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
|
||||
// When tag compression is enabled, tagCompressionContext will have a not null value. Write
|
||||
// the tags using Dictionary compression in such a case
|
||||
if (tagCompressionContext != null) {
|
||||
tagCompressionContext.compressTags(out, in, tagsLength);
|
||||
} else {
|
||||
ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (encodingCtx.getHFileContext().shouldIncludeMvcc()) {
|
||||
// Copy memstore timestamp from the byte buffer to the output stream.
|
||||
|
@ -340,9 +404,18 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
protected final void afterDecodingKeyValue(DataInputStream source,
|
||||
ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
|
||||
if (decodingCtx.getHFileContext().shouldIncludeTags()) {
|
||||
int tagsLength = ByteBufferUtils.readCompressedInt(source);
|
||||
dest.putShort((short)tagsLength);
|
||||
ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
|
||||
short tagsLength = (short) ByteBufferUtils.readCompressedInt(source);
|
||||
dest.putShort(tagsLength);
|
||||
if (tagsLength > 0) {
|
||||
TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
|
||||
// When tag compression is been used in this file, tagCompressionContext will have a not
|
||||
// null value passed.
|
||||
if (tagCompressionContext != null) {
|
||||
tagCompressionContext.uncompressTags(source, dest, tagsLength);
|
||||
} else {
|
||||
ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (decodingCtx.getHFileContext().shouldIncludeMvcc()) {
|
||||
long memstoreTS = -1;
|
||||
|
@ -398,6 +471,14 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
DataOutputStream dataOut =
|
||||
((HFileBlockDefaultEncodingContext) encodingCtx)
|
||||
.getOutputStreamForEncoder();
|
||||
if (encodingCtx.getHFileContext().shouldCompressTags()) {
|
||||
try {
|
||||
TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
|
||||
encodingCtx.setTagCompressionContext(tagCompressionContext);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed to initialize TagCompressionContext", e);
|
||||
}
|
||||
}
|
||||
internalEncodeKeyValues(dataOut, in, encodingCtx);
|
||||
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
|
||||
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
|
||||
|
|
|
@ -67,8 +67,8 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
|
|||
current.valueOffset = currentBuffer.position();
|
||||
ByteBufferUtils.skip(currentBuffer, current.valueLength);
|
||||
if (includesTags()) {
|
||||
current.tagLength = currentBuffer.getShort();
|
||||
ByteBufferUtils.skip(currentBuffer, current.tagLength);
|
||||
current.tagsLength = currentBuffer.getShort();
|
||||
ByteBufferUtils.skip(currentBuffer, current.tagsLength);
|
||||
}
|
||||
if (includesMvcc()) {
|
||||
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.InputStream;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
||||
|
@ -38,6 +39,7 @@ public class HFileBlockDefaultDecodingContext implements
|
|||
HFileBlockDecodingContext {
|
||||
|
||||
private final HFileContext fileContext;
|
||||
private TagCompressionContext tagCompressionContext;
|
||||
|
||||
public HFileBlockDefaultDecodingContext(HFileContext fileContext) {
|
||||
this.fileContext = fileContext;
|
||||
|
@ -58,4 +60,12 @@ public class HFileBlockDefaultDecodingContext implements
|
|||
public HFileContext getHFileContext() {
|
||||
return this.fileContext;
|
||||
}
|
||||
|
||||
public TagCompressionContext getTagCompressionContext() {
|
||||
return tagCompressionContext;
|
||||
}
|
||||
|
||||
public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
|
||||
this.tagCompressionContext = tagCompressionContext;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
@ -62,6 +63,7 @@ public class HFileBlockDefaultEncodingContext implements
|
|||
private byte[] dummyHeader;
|
||||
|
||||
private HFileContext fileContext;
|
||||
private TagCompressionContext tagCompressionContext;
|
||||
|
||||
/**
|
||||
* @param encoding encoding used
|
||||
|
@ -193,4 +195,12 @@ public class HFileBlockDefaultEncodingContext implements
|
|||
public HFileContext getHFileContext() {
|
||||
return this.fileContext;
|
||||
}
|
||||
|
||||
public TagCompressionContext getTagCompressionContext() {
|
||||
return tagCompressionContext;
|
||||
}
|
||||
|
||||
public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
|
||||
this.tagCompressionContext = tagCompressionContext;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -122,6 +122,10 @@ public class HFileContext implements HeapSize, Cloneable {
|
|||
return compressTags;
|
||||
}
|
||||
|
||||
public void setCompressTags(boolean compressTags) {
|
||||
this.compressTags = compressTags;
|
||||
}
|
||||
|
||||
public ChecksumType getChecksumType() {
|
||||
return checksumType;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.io.util.LRUDictionary;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestTagCompressionContext {
|
||||
|
||||
private static final byte[] ROW = Bytes.toBytes("r1");
|
||||
private static final byte[] CF = Bytes.toBytes("f");
|
||||
private static final byte[] Q = Bytes.toBytes("q");
|
||||
private static final byte[] V = Bytes.toBytes("v");
|
||||
|
||||
@Test
|
||||
public void testCompressUncompressTags1() throws Exception {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
|
||||
KeyValue kv1 = createKVWithTags(2);
|
||||
short tagsLength1 = kv1.getTagsLength();
|
||||
ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
|
||||
context.compressTags(baos, ib, tagsLength1);
|
||||
KeyValue kv2 = createKVWithTags(3);
|
||||
short tagsLength2 = kv2.getTagsLength();
|
||||
ib = ByteBuffer.wrap(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
|
||||
context.compressTags(baos, ib, tagsLength2);
|
||||
|
||||
context.clear();
|
||||
|
||||
byte[] dest = new byte[tagsLength1];
|
||||
ByteBuffer ob = ByteBuffer.wrap(baos.toByteArray());
|
||||
context.uncompressTags(ob, dest, 0, tagsLength1);
|
||||
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
|
||||
tagsLength1));
|
||||
dest = new byte[tagsLength2];
|
||||
context.uncompressTags(ob, dest, 0, tagsLength2);
|
||||
assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
|
||||
tagsLength2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompressUncompressTags2() throws Exception {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
|
||||
KeyValue kv1 = createKVWithTags(1);
|
||||
short tagsLength1 = kv1.getTagsLength();
|
||||
context.compressTags(baos, kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
|
||||
KeyValue kv2 = createKVWithTags(3);
|
||||
short tagsLength2 = kv2.getTagsLength();
|
||||
context.compressTags(baos, kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
|
||||
|
||||
context.clear();
|
||||
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
|
||||
byte[] dest = new byte[tagsLength1];
|
||||
context.uncompressTags(bais, dest, 0, tagsLength1);
|
||||
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
|
||||
tagsLength1));
|
||||
dest = new byte[tagsLength2];
|
||||
context.uncompressTags(bais, dest, 0, tagsLength2);
|
||||
assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
|
||||
tagsLength2));
|
||||
}
|
||||
|
||||
private KeyValue createKVWithTags(int noOfTags) {
|
||||
List<Tag> tags = new ArrayList<Tag>();
|
||||
for (int i = 0; i < noOfTags; i++) {
|
||||
tags.add(new Tag((byte) i, "tagValue" + i));
|
||||
}
|
||||
KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
|
||||
return kv;
|
||||
}
|
||||
}
|
|
@ -620,6 +620,7 @@ public class HFile {
|
|||
static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
|
||||
static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
|
||||
static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
|
||||
static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
|
||||
public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
|
||||
private final SortedMap<byte [], byte []> map = new TreeMap<byte [], byte []>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
|
|
|
@ -64,6 +64,10 @@ public class HFileReaderV3 extends HFileReaderV2 {
|
|||
// max tag length is not present in the HFile means tags were not at all written to file.
|
||||
if (tmp != null) {
|
||||
hfileContext.setIncludesTags(true);
|
||||
tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
|
||||
if (tmp != null && Bytes.toBoolean(tmp)) {
|
||||
hfileContext.setCompressTags(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -178,6 +179,9 @@ public class HFileWriterV3 extends HFileWriterV2 {
|
|||
// When tags are not being written in this file, MAX_TAGS_LEN is excluded
|
||||
// from the FileInfo
|
||||
fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
|
||||
boolean tagsCompressed = (hFileContext.getEncodingOnDisk() != DataBlockEncoding.NONE)
|
||||
&& hFileContext.shouldCompressTags();
|
||||
fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -828,6 +828,7 @@ public class HStore implements Store {
|
|||
HFileContext hFileContext = new HFileContextBuilder()
|
||||
.withIncludesMvcc(includeMVCCReadpoint)
|
||||
.withIncludesTags(includesTag)
|
||||
.withCompressTags(family.shouldCompressTags())
|
||||
.withCompressionAlgo(compression)
|
||||
.withChecksumType(checksumType)
|
||||
.withBytesPerCheckSum(bytesPerChecksum)
|
||||
|
|
|
@ -71,6 +71,7 @@ public class TestEncodedSeekers {
|
|||
private final DataBlockEncoding encoding;
|
||||
private final boolean encodeOnDisk;
|
||||
private final boolean includeTags;
|
||||
private final boolean compressTags;
|
||||
|
||||
/** Enable when debugging */
|
||||
private static final boolean VERBOSE = false;
|
||||
|
@ -81,22 +82,27 @@ public class TestEncodedSeekers {
|
|||
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
|
||||
for (boolean includeTags : new boolean[] { false, true }) {
|
||||
for (boolean encodeOnDisk : new boolean[] { false, true }) {
|
||||
paramList.add(new Object[] { encoding, encodeOnDisk, includeTags });
|
||||
for (boolean compressTags : new boolean[] { false, true }) {
|
||||
paramList.add(new Object[] { encoding, encodeOnDisk, includeTags, compressTags });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return paramList;
|
||||
}
|
||||
|
||||
public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags) {
|
||||
public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags,
|
||||
boolean compressTags) {
|
||||
this.encoding = encoding;
|
||||
this.encodeOnDisk = encodeOnDisk;
|
||||
this.includeTags = includeTags;
|
||||
this.compressTags = compressTags;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncodedSeeker() throws IOException {
|
||||
System.err.println("Testing encoded seekers for encoding " + encoding);
|
||||
System.err.println("Testing encoded seekers for encoding : " + encoding + ", encodeOnDisk : "
|
||||
+ encodeOnDisk + ", includeTags : " + includeTags + ", compressTags : " + compressTags);
|
||||
if(includeTags) {
|
||||
testUtil.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3);
|
||||
}
|
||||
|
@ -108,7 +114,8 @@ public class TestEncodedSeekers {
|
|||
setDataBlockEncoding(encoding).
|
||||
setEncodeOnDisk(encodeOnDisk).
|
||||
setBlocksize(BLOCK_SIZE).
|
||||
setBloomFilterType(BloomType.NONE);
|
||||
setBloomFilterType(BloomType.NONE).
|
||||
setCompressTags(compressTags);
|
||||
HRegion region = testUtil.createTestRegion(TABLE_NAME, hcd);
|
||||
|
||||
//write the data, but leave some in the memstore
|
||||
|
|
Loading…
Reference in New Issue