From 518e3c72d6e4792cbc6554b40e7bb8c4830ab6b9 Mon Sep 17 00:00:00 2001 From: anoopsamjohn Date: Sat, 1 Mar 2014 10:50:50 +0000 Subject: [PATCH] HBASE-10451 Enable back Tag compression on HFiles.(Anoop) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1573149 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/HColumnDescriptor.java | 3 +- .../hbase/io/TagCompressionContext.java | 13 +- .../io/encoding/BufferedDataBlockEncoder.java | 78 ++++++++---- .../hbase/io/TestTagCompressionContext.java | 4 +- .../hadoop/hbase/regionserver/HStore.java | 6 +- .../regionserver/wal/CompressionContext.java | 2 +- .../hbase/io/encoding/TestEncodedSeekers.java | 15 +-- ...estStoreFileScannerWithTagCompression.java | 112 ++++++++++++++++++ 8 files changed, 187 insertions(+), 46 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 90a4e519b9e..ee1124912d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -195,9 +195,8 @@ public class HColumnDescriptor implements WritableComparable /** * Default compress tags along with any type of DataBlockEncoding. - * Disabled to false by default in 0.98.0 */ - public static final boolean DEFAULT_COMPRESS_TAGS = false; + public static final boolean DEFAULT_COMPRESS_TAGS = true; private final static Map DEFAULT_VALUES = new HashMap(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java index 5df33ff0e30..7e7fe1d1895 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -41,12 +41,12 @@ import org.apache.hadoop.io.IOUtils; public class TagCompressionContext { private final Dictionary tagDict; - public TagCompressionContext(Class dictType) throws SecurityException, - NoSuchMethodException, InstantiationException, IllegalAccessException, - InvocationTargetException { + public TagCompressionContext(Class dictType, int dictCapacity) + throws SecurityException, NoSuchMethodException, InstantiationException, + IllegalAccessException, InvocationTargetException { Constructor dictConstructor = dictType.getConstructor(); tagDict = dictConstructor.newInstance(); - tagDict.init(Short.MAX_VALUE); + tagDict.init(dictCapacity); } public void clear() { @@ -131,10 +131,12 @@ public class TagCompressionContext { * @param dest Destination array where to write the uncompressed tags * @param offset Offset in destination where tags to be written * @param length Length of all tag bytes + * @return bytes count read from source to uncompress all tags. * @throws IOException */ - public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int length) + public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length) throws IOException { + int srcBeginPos = src.position(); int endOffset = offset + length; while (offset < endOffset) { byte status = src.get(); @@ -158,6 +160,7 @@ public class TagCompressionContext { offset += tagLen; } } + return src.position() - srcBeginPos; } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 513e2e22e9b..d95ee737c11 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -52,12 +52,20 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { HFileBlockDefaultDecodingContext decodingCtx = (HFileBlockDefaultDecodingContext) blkDecodingCtx; - if (decodingCtx.getHFileContext().isCompressTags()) { - try { - TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class); - decodingCtx.setTagCompressionContext(tagCompressionContext); - } catch (Exception e) { - throw new IOException("Failed to initialize TagCompressionContext", e); + if (decodingCtx.getHFileContext().isIncludesTags() + && decodingCtx.getHFileContext().isCompressTags()) { + if (decodingCtx.getTagCompressionContext() != null) { + // It will be overhead to create the TagCompressionContext again and again for every block + // decoding. + decodingCtx.getTagCompressionContext().clear(); + } else { + try { + TagCompressionContext tagCompressionContext = new TagCompressionContext( + LRUDictionary.class, Byte.MAX_VALUE); + decodingCtx.setTagCompressionContext(tagCompressionContext); + } catch (Exception e) { + throw new IOException("Failed to initialize TagCompressionContext", e); + } } } return internalDecodeKeyValues(source, 0, 0, decodingCtx); @@ -70,6 +78,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { protected int lastCommonPrefix; protected int tagsLength = 0; protected int tagsOffset = -1; + protected int tagsCompressedLength = 0; + protected boolean uncompressTags = true; /** We need to store a copy of the key. */ protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE]; @@ -84,6 +94,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { protected void invalidate() { valueOffset = -1; + tagsCompressedLength = 0; + uncompressTags = true; } protected void ensureSpaceForKey() { @@ -160,7 +172,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { this.decodingCtx = decodingCtx; if (decodingCtx.getHFileContext().isCompressTags()) { try { - tagCompressionContext = new TagCompressionContext(LRUDictionary.class); + tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); } catch (Exception e) { throw new RuntimeException("Failed to initialize TagCompressionContext", e); } @@ -249,6 +261,9 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { @Override public void rewind() { currentBuffer.rewind(); + if (tagCompressionContext != null) { + tagCompressionContext.clear(); + } decodeFirst(); previous.invalidate(); } @@ -266,13 +281,18 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { protected void decodeTags() { current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer); if (tagCompressionContext != null) { - // Tag compression is been used. uncompress it into tagsBuffer - current.ensureSpaceForTags(); - try { - tagCompressionContext.uncompressTags(currentBuffer, current.tagsBuffer, 0, - current.tagsLength); - } catch (IOException e) { - throw new RuntimeException("Exception while uncompressing tags", e); + if (current.uncompressTags) { + // Tag compression is been used. uncompress it into tagsBuffer + current.ensureSpaceForTags(); + try { + current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer, + current.tagsBuffer, 0, current.tagsLength); + } catch (IOException e) { + throw new RuntimeException("Exception while uncompressing tags", e); + } + } else { + ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength); + current.uncompressTags = true;// Reset this. } current.tagsOffset = -1; } else { @@ -355,7 +375,15 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { // move after last key value currentBuffer.position(current.nextKvOffset); - + // Already decoded the tag bytes. We cache this tags into current state and also the total + // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode + // the tags again. This might pollute the Data Dictionary what we use for the compression. + // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip + // 'tagsCompressedLength' bytes of source stream. + // See in decodeTags() + current.tagsBuffer = previous.tagsBuffer; + current.tagsCompressedLength = previous.tagsCompressedLength; + current.uncompressTags = false; previous.invalidate(); } @@ -468,12 +496,20 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { (HFileBlockDefaultEncodingContext) blkEncodingCtx; encodingCtx.prepareEncoding(); DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder(); - if (encodingCtx.getHFileContext().isCompressTags()) { - try { - TagCompressionContext tagCompressionContext = new TagCompressionContext(LRUDictionary.class); - encodingCtx.setTagCompressionContext(tagCompressionContext); - } catch (Exception e) { - throw new IOException("Failed to initialize TagCompressionContext", e); + if (encodingCtx.getHFileContext().isIncludesTags() + && encodingCtx.getHFileContext().isCompressTags()) { + if (encodingCtx.getTagCompressionContext() != null) { + // It will be overhead to create the TagCompressionContext again and again for every block + // encoding. + encodingCtx.getTagCompressionContext().clear(); + } else { + try { + TagCompressionContext tagCompressionContext = new TagCompressionContext( + LRUDictionary.class, Byte.MAX_VALUE); + encodingCtx.setTagCompressionContext(tagCompressionContext); + } catch (Exception e) { + throw new IOException("Failed to initialize TagCompressionContext", e); + } } } internalEncodeKeyValues(dataOut, in, encodingCtx); diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java index f0892f700de..82739b914dc 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java @@ -45,7 +45,7 @@ public class TestTagCompressionContext { @Test public void testCompressUncompressTags1() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - TagCompressionContext context = new TagCompressionContext(LRUDictionary.class); + TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); KeyValue kv1 = createKVWithTags(2); short tagsLength1 = kv1.getTagsLength(); ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1); @@ -71,7 +71,7 @@ public class TestTagCompressionContext { @Test public void testCompressUncompressTags2() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - TagCompressionContext context = new TagCompressionContext(LRUDictionary.class); + TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); KeyValue kv1 = createKVWithTags(1); short tagsLength1 = kv1.getTagsLength(); context.compressTags(baos, kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f7dfb174705..bd9d7918ad7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -891,15 +891,11 @@ public class HStore implements Store { if (compression == null) { compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; } - if(family.shouldCompressTags()) { - LOG.warn("HFile tag compression attribute ignored for '" + family.getNameAsString() - + "', see HBASE-10443."); - } HFileContext hFileContext = new HFileContextBuilder() .withIncludesMvcc(includeMVCCReadpoint) .withIncludesTags(includesTag) .withCompression(compression) - .withCompressTags(false) + .withCompressTags(family.shouldCompressTags()) .withChecksumType(checksumType) .withBytesPerCheckSum(bytesPerChecksum) .withBlockSize(blocksize) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 26e80cb7f0e..5f21bfcbbf2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -64,7 +64,7 @@ class CompressionContext { familyDict.init(Byte.MAX_VALUE); qualifierDict.init(Byte.MAX_VALUE); if (hasTagCompression) { - tagCompressionContext = new TagCompressionContext(dictType); + tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java index 90a90630b39..bc87acdc741 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java @@ -70,7 +70,6 @@ public class TestEncodedSeekers { private final HBaseTestingUtility testUtil = HBaseTestingUtility.createLocalHTU(); private final DataBlockEncoding encoding; - private final boolean encodeOnDisk; private final boolean includeTags; private final boolean compressTags; @@ -82,28 +81,24 @@ public class TestEncodedSeekers { List paramList = new ArrayList(); for (DataBlockEncoding encoding : DataBlockEncoding.values()) { for (boolean includeTags : new boolean[] { false, true }) { - for (boolean encodeOnDisk : new boolean[] { false, true }) { - for (boolean compressTags : new boolean[] { false, true }) { - paramList.add(new Object[] { encoding, encodeOnDisk, includeTags, compressTags }); - } + for (boolean compressTags : new boolean[] { false, true }) { + paramList.add(new Object[] { encoding, includeTags, compressTags }); } } } return paramList; } - public TestEncodedSeekers(DataBlockEncoding encoding, boolean encodeOnDisk, boolean includeTags, - boolean compressTags) { + public TestEncodedSeekers(DataBlockEncoding encoding, boolean includeTags, boolean compressTags) { this.encoding = encoding; - this.encodeOnDisk = encodeOnDisk; this.includeTags = includeTags; this.compressTags = compressTags; } @Test public void testEncodedSeeker() throws IOException { - System.err.println("Testing encoded seekers for encoding : " + encoding + ", encodeOnDisk : " - + encodeOnDisk + ", includeTags : " + includeTags + ", compressTags : " + compressTags); + System.err.println("Testing encoded seekers for encoding : " + encoding + ", includeTags : " + + includeTags + ", compressTags : " + compressTags); if(includeTags) { testUtil.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java new file mode 100644 index 00000000000..7a0a722e807 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java @@ -0,0 +1,112 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestStoreFileScannerWithTagCompression { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf = TEST_UTIL.getConfiguration(); + private static CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + private static String ROOT_DIR = TEST_UTIL.getDataTestDir( + "TestStoreFileScannerWithTagCompression").toString(); + private static FileSystem fs = null; + + @BeforeClass + public static void setUp() throws IOException { + conf.setInt("hfile.format.version", 3); + fs = FileSystem.get(conf); + } + + @Test + public void testReseek() throws Exception { + // write the file + Path f = new Path(ROOT_DIR, "testReseek"); + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).withIncludesTags(true) + .withCompressTags(true).withDataBlockEncoding(DataBlockEncoding.PREFIX).build(); + // Make a store file and write data to it. + StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs).withFilePath(f) + .withFileContext(meta).build(); + + writeStoreFile(writer); + writer.close(); + + StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf); + StoreFileScanner s = reader.getStoreFileScanner(false, false); + try { + // Now do reseek with empty KV to position to the beginning of the file + KeyValue k = KeyValue.createFirstOnRow(Bytes.toBytes("k2")); + s.reseek(k); + KeyValue kv = s.next(); + kv = s.next(); + kv = s.next(); + byte[] key5 = Bytes.toBytes("k5"); + assertTrue(Bytes.equals(key5, 0, key5.length, kv.getRowArray(), kv.getRowOffset(), + kv.getRowLength())); + List tags = kv.getTags(); + assertEquals(1, tags.size()); + assertEquals("tag3", Bytes.toString(tags.get(0).getValue())); + } finally { + s.close(); + } + } + + private void writeStoreFile(final StoreFile.Writer writer) throws IOException { + byte[] fam = Bytes.toBytes("f"); + byte[] qualifier = Bytes.toBytes("q"); + long now = System.currentTimeMillis(); + byte[] b = Bytes.toBytes("k1"); + Tag t1 = new Tag((byte) 1, "tag1"); + Tag t2 = new Tag((byte) 2, "tag2"); + Tag t3 = new Tag((byte) 3, "tag3"); + try { + writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t1 })); + b = Bytes.toBytes("k3"); + writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t2, t1 })); + b = Bytes.toBytes("k4"); + writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t3 })); + b = Bytes.toBytes("k5"); + writer.append(new KeyValue(b, fam, qualifier, now, b, new Tag[] { t3 })); + } finally { + writer.close(); + } + } +}