HBASE-10451 Enable back Tag compression on HFiles.(Anoop)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1573149 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2014-03-01 10:50:50 +00:00
parent 05139ace54
commit 518e3c72d6
8 changed files with 187 additions and 46 deletions

View File

@ -195,9 +195,8 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
/**
* Default compress tags along with any type of DataBlockEncoding.
* Disabled to false by default in 0.98.0
*/
public static final boolean DEFAULT_COMPRESS_TAGS = false;
public static final boolean DEFAULT_COMPRESS_TAGS = true;
private final static Map<String, String> DEFAULT_VALUES
= new HashMap<String, String>();

View File

@ -41,12 +41,12 @@ import org.apache.hadoop.io.IOUtils;
public class TagCompressionContext {
private final Dictionary tagDict;
public TagCompressionContext(Class<? extends Dictionary> dictType) throws SecurityException,
NoSuchMethodException, InstantiationException, IllegalAccessException,
InvocationTargetException {
public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
tagDict = dictConstructor.newInstance();
tagDict.init(Short.MAX_VALUE);
tagDict.init(dictCapacity);
}
public void clear() {
@ -131,10 +131,12 @@ public class TagCompressionContext {
* @param dest Destination array where to write the uncompressed tags
* @param offset Offset in destination where tags to be written
* @param length Length of all tag bytes
* @return bytes count read from source to uncompress all tags.
* @throws IOException
*/
public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
throws IOException {
int srcBeginPos = src.position();
int endOffset = offset + length;
while (offset < endOffset) {
byte status = src.get();
@ -158,6 +160,7 @@ public class TagCompressionContext {
offset += tagLen;
}
}
return src.position() - srcBeginPos;
}
/**

View File

@ -52,12 +52,20 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
HFileBlockDefaultDecodingContext decodingCtx =
(HFileBlockDefaultDecodingContext) blkDecodingCtx;
if (decodingCtx.getHFileContext().isCompressTags()) {
try {
TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
decodingCtx.setTagCompressionContext(tagCompressionContext);
} catch (Exception e) {
throw new IOException("Failed to initialize TagCompressionContext", e);
if (decodingCtx.getHFileContext().isIncludesTags()
&& decodingCtx.getHFileContext().isCompressTags()) {
if (decodingCtx.getTagCompressionContext() != null) {
// It will be overhead to create the TagCompressionContext again and again for every block
// decoding.
decodingCtx.getTagCompressionContext().clear();
} else {
try {
TagCompressionContext tagCompressionContext = new TagCompressionContext(
LRUDictionary.class, Byte.MAX_VALUE);
decodingCtx.setTagCompressionContext(tagCompressionContext);
} catch (Exception e) {
throw new IOException("Failed to initialize TagCompressionContext", e);
}
}
}
return internalDecodeKeyValues(source, 0, 0, decodingCtx);
@ -70,6 +78,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
protected int lastCommonPrefix;
protected int tagsLength = 0;
protected int tagsOffset = -1;
protected int tagsCompressedLength = 0;
protected boolean uncompressTags = true;
/** We need to store a copy of the key. */
protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
@ -84,6 +94,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
protected void invalidate() {
valueOffset = -1;
tagsCompressedLength = 0;
uncompressTags = true;
}
protected void ensureSpaceForKey() {
@ -160,7 +172,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
this.decodingCtx = decodingCtx;
if (decodingCtx.getHFileContext().isCompressTags()) {
try {
tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize TagCompressionContext", e);
}
@ -249,6 +261,9 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
@Override
public void rewind() {
currentBuffer.rewind();
if (tagCompressionContext != null) {
tagCompressionContext.clear();
}
decodeFirst();
previous.invalidate();
}
@ -266,13 +281,18 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
protected void decodeTags() {
current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
if (tagCompressionContext != null) {
// Tag compression is been used. uncompress it into tagsBuffer
current.ensureSpaceForTags();
try {
tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0,
current.tagsLength);
} catch (IOException e) {
throw new RuntimeException("Exception while uncompressing tags", e);
if (current.uncompressTags) {
// Tag compression is been used. uncompress it into tagsBuffer
current.ensureSpaceForTags();
try {
current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
current.tagsBuffer, 0, current.tagsLength);
} catch (IOException e) {
throw new RuntimeException("Exception while uncompressing tags", e);
}
} else {
ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
current.uncompressTags = true;// Reset this.
}
current.tagsOffset = -1;
} else {
@ -355,7 +375,15 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
// move after last key value
currentBuffer.position(current.nextKvOffset);
// Already decoded the tag bytes. We cache this tags into current state and also the total
// compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
// the tags again. This might pollute the Data Dictionary what we use for the compression.
// When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
// 'tagsCompressedLength' bytes of source stream.
// See in decodeTags()
current.tagsBuffer = previous.tagsBuffer;
current.tagsCompressedLength = previous.tagsCompressedLength;
current.uncompressTags = false;
previous.invalidate();
}
@ -468,12 +496,20 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding();
DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
if (encodingCtx.getHFileContext().isCompressTags()) {
try {
TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
encodingCtx.setTagCompressionContext(tagCompressionContext);
} catch (Exception e) {
throw new IOException("Failed to initialize TagCompressionContext", e);
if (encodingCtx.getHFileContext().isIncludesTags()
&& encodingCtx.getHFileContext().isCompressTags()) {
if (encodingCtx.getTagCompressionContext() != null) {
// It will be overhead to create the TagCompressionContext again and again for every block
// encoding.
encodingCtx.getTagCompressionContext().clear();
} else {
try {
TagCompressionContext tagCompressionContext = new TagCompressionContext(
LRUDictionary.class, Byte.MAX_VALUE);
encodingCtx.setTagCompressionContext(tagCompressionContext);
} catch (Exception e) {
throw new IOException("Failed to initialize TagCompressionContext", e);
}
}
}
internalEncodeKeyValues(dataOut, in, encodingCtx);

View File

@ -45,7 +45,7 @@ public class TestTagCompressionContext {
@Test
public void testCompressUncompressTags1() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
KeyValue kv1 = createKVWithTags(2);
short tagsLength1 = kv1.getTagsLength();
ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
@ -71,7 +71,7 @@ public class TestTagCompressionContext {
@Test
public void testCompressUncompressTags2() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class);
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
KeyValue kv1 = createKVWithTags(1);
short tagsLength1 = kv1.getTagsLength();
context.compressTags(baos, kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);

View File

@ -891,15 +891,11 @@ public class HStore implements Store {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
if(family.shouldCompressTags()) {
LOG.warn("HFile tag compression attribute ignored for '" + family.getNameAsString()
+ "', see HBASE-10443.");
}
HFileContext hFileContext = new HFileContextBuilder()
.withIncludesMvcc(includeMVCCReadpoint)
.withIncludesTags(includesTag)
.withCompression(compression)
.withCompressTags(false)
.withCompressTags(family.shouldCompressTags())
.withChecksumType(checksumType)
.withBytesPerCheckSum(bytesPerChecksum)
.withBlockSize(blocksize)

View File

@ -64,7 +64,7 @@ class CompressionContext {
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
if (hasTagCompression) {
tagCompressionContext = new TagCompressionContext(dictType);
tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE);
}
}

View File

@ -70,7 +70,6 @@ public class TestEncodedSeekers {
private final HBaseTestingUtility testUtil = HBaseTestingUtility.createLocalHTU();
private final DataBlockEncoding encoding;
private final boolean encodeOnDisk;
private final boolean includeTags;
private final boolean compressTags;
@ -82,28 +81,24 @@ public class TestEncodedSeekers {
List<Object[]> paramList = new ArrayList<Object[]>();
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
for (boolean includeTags : new boolean[] { false, true }) {
for (boolean encodeOnDisk : new boolean[] { false, true }) {
for (boolean compressTags : new boolean[] { false, true }) {
paramList.add(new Object[] { encoding, encodeOnDisk, includeTags, compressTags });
}
for (boolean compressTags : new boolean[] { false, true }) {
paramList.add(new Object[] { encoding, includeTags, compressTags });
}
}
}
return paramList;
}
public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags,
boolean compressTags) {
public TestEncodedSeekers(DataBlockEncoding encoding, boolean includeTags, boolean compressTags) {
this.encoding = encoding;
this.encodeOnDisk = encodeOnDisk;
this.includeTags = includeTags;
this.compressTags = compressTags;
}
@Test
public void testEncodedSeeker() throws IOException {
System.err.println("Testing encoded seekers for encoding : " + encoding + ", encodeOnDisk : "
+ encodeOnDisk + ", includeTags : " + includeTags + ", compressTags : " + compressTags);
System.err.println("Testing encoded seekers for encoding : " + encoding + ", includeTags : "
+ includeTags + ", compressTags : " + compressTags);
if(includeTags) {
testUtil.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3);
}

View File

@ -0,0 +1,112 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestStoreFileScannerWithTagCompression {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf = TEST_UTIL.getConfiguration();
private static CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
private static String ROOT_DIR = TEST_UTIL.getDataTestDir(
"TestStoreFileScannerWithTagCompression").toString();
private static FileSystem fs = null;
@BeforeClass
public static void setUp() throws IOException {
conf.setInt("hfile.format.version", 3);
fs = FileSystem.get(conf);
}
@Test
public void testReseek() throws Exception {
// write the file
Path f = new Path(ROOT_DIR, "testReseek");
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).withIncludesTags(true)
.withCompressTags(true).withDataBlockEncoding(DataBlockEncoding.PREFIX).build();
// Make a store file and write data to it.
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs).withFilePath(f)
.withFileContext(meta).build();
writeStoreFile(writer);
writer.close();
StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf);
StoreFileScanner s = reader.getStoreFileScanner(false, false);
try {
// Now do reseek with empty KV to position to the beginning of the file
KeyValue k = KeyValue.createFirstOnRow(Bytes.toBytes("k2"));
s.reseek(k);
KeyValue kv = s.next();
kv = s.next();
kv = s.next();
byte[] key5 = Bytes.toBytes("k5");
assertTrue(Bytes.equals(key5, 0, key5.length, kv.getRowArray(), kv.getRowOffset(),
kv.getRowLength()));
List<Tag> tags = kv.getTags();
assertEquals(1, tags.size());
assertEquals("tag3", Bytes.toString(tags.get(0).getValue()));
} finally {
s.close();
}
}
private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
byte[] fam = Bytes.toBytes("f");
byte[] qualifier = Bytes.toBytes("q");
long now = System.currentTimeMillis();
byte[] b = Bytes.toBytes("k1");
Tag t1 = new Tag((byte) 1, "tag1");
Tag t2 = new Tag((byte) 2, "tag2");
Tag t3 = new Tag((byte) 3, "tag3");
try {
writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t1 }));
b = Bytes.toBytes("k3");
writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t2, t1 }));
b = Bytes.toBytes("k4");
writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t3 }));
b = Bytes.toBytes("k5");
writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t3 }));
} finally {
writer.close();
}
}
}