diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 1b38b568017..7db1c76797d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; @@ -1629,6 +1630,24 @@ public final class CellUtil { return new FirstOnRowDeleteFamilyCell(row, fam); } + /** + * Compresses the tags to the given outputstream using the TagcompressionContext + * @param out the outputstream to which the compression should happen + * @param cell the cell which has tags + * @param tagCompressionContext the TagCompressionContext + * @throws IOException can throw IOException if the compression encounters issue + */ + public static void compressTags(DataOutputStream out, Cell cell, + TagCompressionContext tagCompressionContext) throws IOException { + if (cell instanceof ByteBufferedCell) { + tagCompressionContext.compressTags(out, ((ByteBufferedCell) cell).getTagsByteBuffer(), + ((ByteBufferedCell) cell).getTagsPosition(), cell.getTagsLength()); + } else { + tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + } + } + @InterfaceAudience.Private /** * These cells are used in reseeks/seeks to improve the read performance. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java index 05c4ad1b837..278dfc401ba 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -79,17 +79,24 @@ public class TagCompressionContext { * Compress tags one by one and writes to the OutputStream. * @param out Stream to which the compressed tags to be written * @param in Source buffer where tags are available + * @param offset Offset for the tags byte buffer * @param length Length of all tag bytes * @throws IOException */ - public void compressTags(OutputStream out, ByteBuffer in, int length) throws IOException { + public void compressTags(OutputStream out, ByteBuffer in, int offset, int length) + throws IOException { if (in.hasArray()) { - compressTags(out, in.array(), in.arrayOffset() + in.position(), length); - ByteBufferUtils.skip(in, length); + compressTags(out, in.array(), offset, length); } else { - byte[] tagBuf = new byte[length]; - in.get(tagBuf); - compressTags(out, tagBuf, 0, length); + int pos = offset; + int endOffset = pos + length; + assert pos < endOffset; + while (pos < endOffset) { + int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); + pos += Tag.TAG_LENGTH_SIZE; + write(in, pos, tagLen, out); + pos += tagLen; + } } } @@ -167,7 +174,7 @@ public class TagCompressionContext { * @param src Stream where the compressed tags are available * @param dest Destination buffer where to write the uncompressed tags * @param length Length of all tag bytes - * @throws IOException + * @throws IOException when the dictionary does not have the entry */ public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException { if (dest.hasArray()) { @@ -192,4 +199,18 @@ public class TagCompressionContext { StreamUtils.writeShort(out, dictIdx); } } + + private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (tagDict != null) { + dictIdx = tagDict.findEntry(data, offset, length); + } + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + out.write(Dictionary.NOT_IN_DICTIONARY); + StreamUtils.writeRawVInt32(out, length); + ByteBufferUtils.copyBufferToStream(out, data, offset, length); + } else { + StreamUtils.writeShort(out, dictIdx); + } + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 33e38c7982b..817b1a79476 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -1002,10 +1002,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { // When tag compression is enabled, tagCompressionContext will have a not null value. Write // the tags using Dictionary compression in such a case if (tagCompressionContext != null) { - // TODO : Make Dictionary interface to work with BBs and then change the corresponding - // compress tags code to work with BB - tagCompressionContext - .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + // Not passing tagsLength considering that parsing of the tagsLength is not costly + CellUtil.compressTags(out, cell, tagCompressionContext); } else { CellUtil.writeTags(out, cell, tagsLength); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java index 4a3d42fec87..54677da999f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.io.util; +import java.nio.ByteBuffer; + import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -50,6 +52,16 @@ public interface Dictionary { */ short findEntry(byte[] data, int offset, int length); + /** + * Finds the index of an entry. + * If no entry found, we add it. + * @param data the ByteBuffer that we're looking up + * @param offset Offset into data to add to Dictionary. + * @param length Length beyond offset that comprises entry; must be > 0. + * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found + */ + short findEntry(ByteBuffer data, int offset, int length); + /** * Adds an entry to the dictionary. * Be careful using this method. It will add an entry to the @@ -62,7 +74,6 @@ public interface Dictionary { * @param length Length beyond offset that comprises entry; must be > 0. * @return the index of the entry */ - short addEntry(byte[] data, int offset, int length); /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java index 8562cf06f8a..99780ba6935 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hbase.io.util; +import java.nio.ByteBuffer; import java.util.HashMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import com.google.common.base.Preconditions; @@ -58,8 +60,12 @@ public class LRUDictionary implements Dictionary { @Override public short addEntry(byte[] data, int offset, int length) { + return addEntryInternal(data, offset, length, true); + } + + private short addEntryInternal(byte[] data, int offset, int length, boolean copy) { if (length <= 0) return NOT_IN_DICTIONARY; - return backingStore.put(data, offset, length); + return backingStore.put(data, offset, length, copy); } @Override @@ -89,16 +95,23 @@ public class LRUDictionary implements Dictionary { indexToNode = new Node[initialSize]; } - private short put(byte[] array, int offset, int length) { - // We copy the bytes we want, otherwise we might be holding references to - // massive arrays in our dictionary (or those arrays might change) - byte[] stored = new byte[length]; - Bytes.putBytes(stored, 0, array, offset, length); + private short put(byte[] array, int offset, int length, boolean copy) { + if (copy) { + // We copy the bytes we want, otherwise we might be holding references to + // massive arrays in our dictionary (or those arrays might change) + byte[] stored = new byte[length]; + Bytes.putBytes(stored, 0, array, offset, length); + return putInternal(stored); + } else { + return putInternal(array); + } + } + private short putInternal(byte[] stored) { if (currSize < initSize) { // There is space to add without evicting. if (indexToNode[currSize] == null) { - indexToNode[currSize] = new Node(); + indexToNode[currSize] = new ByteArrayBackedNode(); } indexToNode[currSize].setContents(stored, 0, stored.length); setHead(indexToNode[currSize]); @@ -117,7 +130,7 @@ public class LRUDictionary implements Dictionary { private short findIdx(byte[] array, int offset, int length) { Short s; - final Node comparisonNode = new Node(); + final Node comparisonNode = new ByteArrayBackedNode(); comparisonNode.setContents(array, offset, length); if ((s = nodeToIndex.get(comparisonNode)) != null) { moveToHead(indexToNode[s]); @@ -127,10 +140,22 @@ public class LRUDictionary implements Dictionary { } } + private short findIdx(ByteBuffer buf, int offset, int length) { + Short s; + final ByteBufferBackedNode comparisonNode = new ByteBufferBackedNode(); + comparisonNode.setContents(buf, offset, length); + if ((s = nodeToIndex.get(comparisonNode)) != null) { + moveToHead(indexToNode[s]); + return s; + } else { + return -1; + } + } + private byte[] get(short idx) { Preconditions.checkElementIndex(idx, currSize); moveToHead(indexToNode[idx]); - return indexToNode[idx].container; + return indexToNode[idx].getContents(); } private void moveToHead(Node n) { @@ -153,7 +178,7 @@ public class LRUDictionary implements Dictionary { // Node is now removed from the list. Re-add it at the head. setHead(n); } - + private void setHead(Node n) { // assume it's already unlinked from the list at this point. n.prev = null; @@ -175,7 +200,7 @@ public class LRUDictionary implements Dictionary { for (int i = 0; i < currSize; i++) { indexToNode[i].next = null; indexToNode[i].prev = null; - indexToNode[i].container = null; + indexToNode[i].resetContents(); } currSize = 0; nodeToIndex.clear(); @@ -183,27 +208,41 @@ public class LRUDictionary implements Dictionary { head = null; } - private static class Node { - byte[] container; + private static abstract class Node { int offset; int length; Node next; // link towards the tail Node prev; // link towards the head + abstract void setContents(byte[] container, int offset, int length); + abstract byte[] getContents(); + abstract void resetContents(); + } + // The actual contents of the LRUDictionary are of ByteArrayBackedNode type + private static class ByteArrayBackedNode extends Node { + private byte[] container; - public Node() { - } - - private void setContents(byte[] container, int offset, int length) { + @Override + void setContents(byte[] container, int offset, int length) { this.container = container; this.offset = offset; this.length = length; } + @Override + byte[] getContents() { + return this.container; + } + @Override public int hashCode() { return Bytes.hashCode(container, offset, length); } + @Override + void resetContents() { + this.container = null; + } + @Override public boolean equals(Object other) { if (!(other instanceof Node)) { @@ -211,9 +250,75 @@ public class LRUDictionary implements Dictionary { } Node casted = (Node) other; - return Bytes.equals(container, offset, length, casted.container, + return Bytes.equals(container, offset, length, casted.getContents(), casted.offset, casted.length); } } + + // Currently only used for finding the index and hence this node acts + // as a temporary holder to look up in the indexToNode map + // which is formed by ByteArrayBackedNode + private static class ByteBufferBackedNode extends Node { + private ByteBuffer container; + + public ByteBufferBackedNode() { + } + + @Override + void setContents(byte[] container, int offset, int length) { + this.container = ByteBuffer.wrap(container); + this.offset = offset; + this.length = length; + } + + void setContents(ByteBuffer container, int offset, int length) { + this.container = container; + this.offset = offset; + this.length = length; + } + + @Override + void resetContents() { + this.container = null; + } + + @Override + byte[] getContents() { + // This ideally should not be called + byte[] copy = new byte[this.length]; + ByteBufferUtils.copyFromBufferToArray(copy, (ByteBuffer) this.container, this.offset, 0, + length); + return copy; + } + + @Override + public int hashCode() { + return ByteBufferUtils.hashCode(container, offset, length); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof Node)) { + return false; + } + // This was done to avoid findbugs comment + Node casted = (Node) other; + // The other side should be a byte array backed node only as we add only + // ByteArrayBackedNode to the indexToNode map. + return ByteBufferUtils.equals(this.container, offset, length, + casted.getContents(), casted.offset, casted.length); + } + } + } + + @Override + public short findEntry(ByteBuffer data, int offset, int length) { + short ret = backingStore.findIdx(data, offset, length); + if (ret == NOT_IN_DICTIONARY) { + byte[] copy = new byte[length]; + ByteBufferUtils.copyFromBufferToArray(copy, data, offset, 0, length); + addEntryInternal(copy, 0, length, false); + } + return ret; } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 99e798a2cf1..7f3d777a473 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -579,6 +579,22 @@ public final class ByteBufferUtils { return compareTo(buf1, o1, l1, buf2, o2, l2) == 0; } + /** + * @param buf + * ByteBuffer to hash + * @param offset + * offset to start from + * @param length + * length to hash + */ + public static int hashCode(ByteBuffer buf, int offset, int length) { + int hash = 1; + for (int i = offset; i < offset + length; i++) { + hash = (31 * hash) + (int) toByte(buf, i); + } + return hash; + } + public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) { if (UNSAFE_UNALIGNED) { long offset1Adj, offset2Adj; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java index 6c46cf24269..a332a630fdc 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java @@ -21,18 +21,22 @@ package org.apache.hadoop.hbase.io; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.ByteBufferedCell; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,12 +55,12 @@ public class TestTagCompressionContext { TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); KeyValue kv1 = createKVWithTags(2); int tagsLength1 = kv1.getTagsLength(); - ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1); - context.compressTags(baos, ib, tagsLength1); + ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray()); + context.compressTags(baos, ib, kv1.getTagsOffset(), tagsLength1); KeyValue kv2 = createKVWithTags(3); int tagsLength2 = kv2.getTagsLength(); - ib = ByteBuffer.wrap(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2); - context.compressTags(baos, ib, tagsLength2); + ib = ByteBuffer.wrap(kv2.getTagsArray()); + context.compressTags(baos, ib, kv2.getTagsOffset(), tagsLength2); context.clear(); @@ -71,6 +75,31 @@ public class TestTagCompressionContext { tagsLength2)); } + @Test + public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos); + TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); + ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2); + int tagsLength1 = kv1.getTagsLength(); + context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1); + ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3); + int tagsLength2 = kv2.getTagsLength(); + context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2); + + context.clear(); + + byte[] dest = new byte[tagsLength1]; + ByteBuffer ob = ByteBuffer.wrap(baos.getBuffer()); + context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1); + assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, + tagsLength1)); + dest = new byte[tagsLength2]; + context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2); + assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0, + tagsLength2)); + } + @Test public void testCompressUncompressTags2() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -84,7 +113,32 @@ public class TestTagCompressionContext { context.clear(); - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer()); + byte[] dest = new byte[tagsLength1]; + context.uncompressTags(bais, dest, 0, tagsLength1); + assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, + tagsLength1)); + dest = new byte[tagsLength2]; + context.uncompressTags(bais, dest, 0, tagsLength2); + assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0, + tagsLength2)); + } + + @Test + public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos); + TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); + ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1); + int tagsLength1 = kv1.getTagsLength(); + context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1); + ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3); + int tagsLength2 = kv2.getTagsLength(); + context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2); + + context.clear(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer()); byte[] dest = new byte[tagsLength1]; context.uncompressTags(bais, dest, 0, tagsLength1); assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, @@ -103,4 +157,16 @@ public class TestTagCompressionContext { KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags); return kv; } + + private Cell createOffheapKVWithTags(int noOfTags) { + List tags = new ArrayList(); + for (int i = 0; i < noOfTags; i++) { + tags.add(new ArrayBackedTag((byte) i, "tagValue" + i)); + } + KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags); + ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length); + ByteBufferUtils.copyFromArrayToBuffer(dbb, kv.getBuffer(), 0, kv.getBuffer().length); + OffheapKeyValue offheapKV = new OffheapKeyValue(dbb, 0, kv.getBuffer().length, true, 0); + return offheapKV; + } }