HBASE-14841 Allow Dictionary to work with BytebufferedCells (Ram)
This commit is contained in:
parent
47506e805d
commit
0de221a19d
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue.Type;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
|
import org.apache.hadoop.hbase.io.TagCompressionContext;
|
||||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||||
import org.apache.hadoop.hbase.util.ByteRange;
|
import org.apache.hadoop.hbase.util.ByteRange;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -1629,6 +1630,24 @@ public final class CellUtil {
|
||||||
return new FirstOnRowDeleteFamilyCell(row, fam);
|
return new FirstOnRowDeleteFamilyCell(row, fam);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compresses the tags to the given outputstream using the TagcompressionContext
|
||||||
|
* @param out the outputstream to which the compression should happen
|
||||||
|
* @param cell the cell which has tags
|
||||||
|
* @param tagCompressionContext the TagCompressionContext
|
||||||
|
* @throws IOException can throw IOException if the compression encounters issue
|
||||||
|
*/
|
||||||
|
public static void compressTags(DataOutputStream out, Cell cell,
|
||||||
|
TagCompressionContext tagCompressionContext) throws IOException {
|
||||||
|
if (cell instanceof ByteBufferedCell) {
|
||||||
|
tagCompressionContext.compressTags(out, ((ByteBufferedCell) cell).getTagsByteBuffer(),
|
||||||
|
((ByteBufferedCell) cell).getTagsPosition(), cell.getTagsLength());
|
||||||
|
} else {
|
||||||
|
tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(),
|
||||||
|
cell.getTagsLength());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
/**
|
/**
|
||||||
* These cells are used in reseeks/seeks to improve the read performance.
|
* These cells are used in reseeks/seeks to improve the read performance.
|
||||||
|
|
|
@ -79,17 +79,24 @@ public class TagCompressionContext {
|
||||||
* Compress tags one by one and writes to the OutputStream.
|
* Compress tags one by one and writes to the OutputStream.
|
||||||
* @param out Stream to which the compressed tags to be written
|
* @param out Stream to which the compressed tags to be written
|
||||||
* @param in Source buffer where tags are available
|
* @param in Source buffer where tags are available
|
||||||
|
* @param offset Offset for the tags byte buffer
|
||||||
* @param length Length of all tag bytes
|
* @param length Length of all tag bytes
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void compressTags(OutputStream out, ByteBuffer in, int length) throws IOException {
|
public void compressTags(OutputStream out, ByteBuffer in, int offset, int length)
|
||||||
|
throws IOException {
|
||||||
if (in.hasArray()) {
|
if (in.hasArray()) {
|
||||||
compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
|
compressTags(out, in.array(), offset, length);
|
||||||
ByteBufferUtils.skip(in, length);
|
|
||||||
} else {
|
} else {
|
||||||
byte[] tagBuf = new byte[length];
|
int pos = offset;
|
||||||
in.get(tagBuf);
|
int endOffset = pos + length;
|
||||||
compressTags(out, tagBuf, 0, length);
|
assert pos < endOffset;
|
||||||
|
while (pos < endOffset) {
|
||||||
|
int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
|
||||||
|
pos += Tag.TAG_LENGTH_SIZE;
|
||||||
|
write(in, pos, tagLen, out);
|
||||||
|
pos += tagLen;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,7 +174,7 @@ public class TagCompressionContext {
|
||||||
* @param src Stream where the compressed tags are available
|
* @param src Stream where the compressed tags are available
|
||||||
* @param dest Destination buffer where to write the uncompressed tags
|
* @param dest Destination buffer where to write the uncompressed tags
|
||||||
* @param length Length of all tag bytes
|
* @param length Length of all tag bytes
|
||||||
* @throws IOException
|
* @throws IOException when the dictionary does not have the entry
|
||||||
*/
|
*/
|
||||||
public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
|
public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
|
||||||
if (dest.hasArray()) {
|
if (dest.hasArray()) {
|
||||||
|
@ -192,4 +199,18 @@ public class TagCompressionContext {
|
||||||
StreamUtils.writeShort(out, dictIdx);
|
StreamUtils.writeShort(out, dictIdx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException {
|
||||||
|
short dictIdx = Dictionary.NOT_IN_DICTIONARY;
|
||||||
|
if (tagDict != null) {
|
||||||
|
dictIdx = tagDict.findEntry(data, offset, length);
|
||||||
|
}
|
||||||
|
if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
|
||||||
|
out.write(Dictionary.NOT_IN_DICTIONARY);
|
||||||
|
StreamUtils.writeRawVInt32(out, length);
|
||||||
|
ByteBufferUtils.copyBufferToStream(out, data, offset, length);
|
||||||
|
} else {
|
||||||
|
StreamUtils.writeShort(out, dictIdx);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1002,10 +1002,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
||||||
// When tag compression is enabled, tagCompressionContext will have a not null value. Write
|
// When tag compression is enabled, tagCompressionContext will have a not null value. Write
|
||||||
// the tags using Dictionary compression in such a case
|
// the tags using Dictionary compression in such a case
|
||||||
if (tagCompressionContext != null) {
|
if (tagCompressionContext != null) {
|
||||||
// TODO : Make Dictionary interface to work with BBs and then change the corresponding
|
// Not passing tagsLength considering that parsing of the tagsLength is not costly
|
||||||
// compress tags code to work with BB
|
CellUtil.compressTags(out, cell, tagCompressionContext);
|
||||||
tagCompressionContext
|
|
||||||
.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
|
|
||||||
} else {
|
} else {
|
||||||
CellUtil.writeTags(out, cell, tagsLength);
|
CellUtil.writeTags(out, cell, tagsLength);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.io.util;
|
package org.apache.hadoop.hbase.io.util;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -50,6 +52,16 @@ public interface Dictionary {
|
||||||
*/
|
*/
|
||||||
short findEntry(byte[] data, int offset, int length);
|
short findEntry(byte[] data, int offset, int length);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds the index of an entry.
|
||||||
|
* If no entry found, we add it.
|
||||||
|
* @param data the ByteBuffer that we're looking up
|
||||||
|
* @param offset Offset into <code>data</code> to add to Dictionary.
|
||||||
|
* @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
|
||||||
|
* @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
|
||||||
|
*/
|
||||||
|
short findEntry(ByteBuffer data, int offset, int length);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds an entry to the dictionary.
|
* Adds an entry to the dictionary.
|
||||||
* Be careful using this method. It will add an entry to the
|
* Be careful using this method. It will add an entry to the
|
||||||
|
@ -62,7 +74,6 @@ public interface Dictionary {
|
||||||
* @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
|
* @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
|
||||||
* @return the index of the entry
|
* @return the index of the entry
|
||||||
*/
|
*/
|
||||||
|
|
||||||
short addEntry(byte[] data, int offset, int length);
|
short addEntry(byte[] data, int offset, int length);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.io.util;
|
package org.apache.hadoop.hbase.io.util;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -58,8 +60,12 @@ public class LRUDictionary implements Dictionary {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public short addEntry(byte[] data, int offset, int length) {
|
public short addEntry(byte[] data, int offset, int length) {
|
||||||
|
return addEntryInternal(data, offset, length, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private short addEntryInternal(byte[] data, int offset, int length, boolean copy) {
|
||||||
if (length <= 0) return NOT_IN_DICTIONARY;
|
if (length <= 0) return NOT_IN_DICTIONARY;
|
||||||
return backingStore.put(data, offset, length);
|
return backingStore.put(data, offset, length, copy);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -89,16 +95,23 @@ public class LRUDictionary implements Dictionary {
|
||||||
indexToNode = new Node[initialSize];
|
indexToNode = new Node[initialSize];
|
||||||
}
|
}
|
||||||
|
|
||||||
private short put(byte[] array, int offset, int length) {
|
private short put(byte[] array, int offset, int length, boolean copy) {
|
||||||
// We copy the bytes we want, otherwise we might be holding references to
|
if (copy) {
|
||||||
// massive arrays in our dictionary (or those arrays might change)
|
// We copy the bytes we want, otherwise we might be holding references to
|
||||||
byte[] stored = new byte[length];
|
// massive arrays in our dictionary (or those arrays might change)
|
||||||
Bytes.putBytes(stored, 0, array, offset, length);
|
byte[] stored = new byte[length];
|
||||||
|
Bytes.putBytes(stored, 0, array, offset, length);
|
||||||
|
return putInternal(stored);
|
||||||
|
} else {
|
||||||
|
return putInternal(array);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private short putInternal(byte[] stored) {
|
||||||
if (currSize < initSize) {
|
if (currSize < initSize) {
|
||||||
// There is space to add without evicting.
|
// There is space to add without evicting.
|
||||||
if (indexToNode[currSize] == null) {
|
if (indexToNode[currSize] == null) {
|
||||||
indexToNode[currSize] = new Node();
|
indexToNode[currSize] = new ByteArrayBackedNode();
|
||||||
}
|
}
|
||||||
indexToNode[currSize].setContents(stored, 0, stored.length);
|
indexToNode[currSize].setContents(stored, 0, stored.length);
|
||||||
setHead(indexToNode[currSize]);
|
setHead(indexToNode[currSize]);
|
||||||
|
@ -117,7 +130,7 @@ public class LRUDictionary implements Dictionary {
|
||||||
|
|
||||||
private short findIdx(byte[] array, int offset, int length) {
|
private short findIdx(byte[] array, int offset, int length) {
|
||||||
Short s;
|
Short s;
|
||||||
final Node comparisonNode = new Node();
|
final Node comparisonNode = new ByteArrayBackedNode();
|
||||||
comparisonNode.setContents(array, offset, length);
|
comparisonNode.setContents(array, offset, length);
|
||||||
if ((s = nodeToIndex.get(comparisonNode)) != null) {
|
if ((s = nodeToIndex.get(comparisonNode)) != null) {
|
||||||
moveToHead(indexToNode[s]);
|
moveToHead(indexToNode[s]);
|
||||||
|
@ -127,10 +140,22 @@ public class LRUDictionary implements Dictionary {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private short findIdx(ByteBuffer buf, int offset, int length) {
|
||||||
|
Short s;
|
||||||
|
final ByteBufferBackedNode comparisonNode = new ByteBufferBackedNode();
|
||||||
|
comparisonNode.setContents(buf, offset, length);
|
||||||
|
if ((s = nodeToIndex.get(comparisonNode)) != null) {
|
||||||
|
moveToHead(indexToNode[s]);
|
||||||
|
return s;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private byte[] get(short idx) {
|
private byte[] get(short idx) {
|
||||||
Preconditions.checkElementIndex(idx, currSize);
|
Preconditions.checkElementIndex(idx, currSize);
|
||||||
moveToHead(indexToNode[idx]);
|
moveToHead(indexToNode[idx]);
|
||||||
return indexToNode[idx].container;
|
return indexToNode[idx].getContents();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void moveToHead(Node n) {
|
private void moveToHead(Node n) {
|
||||||
|
@ -153,7 +178,7 @@ public class LRUDictionary implements Dictionary {
|
||||||
// Node is now removed from the list. Re-add it at the head.
|
// Node is now removed from the list. Re-add it at the head.
|
||||||
setHead(n);
|
setHead(n);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setHead(Node n) {
|
private void setHead(Node n) {
|
||||||
// assume it's already unlinked from the list at this point.
|
// assume it's already unlinked from the list at this point.
|
||||||
n.prev = null;
|
n.prev = null;
|
||||||
|
@ -175,7 +200,7 @@ public class LRUDictionary implements Dictionary {
|
||||||
for (int i = 0; i < currSize; i++) {
|
for (int i = 0; i < currSize; i++) {
|
||||||
indexToNode[i].next = null;
|
indexToNode[i].next = null;
|
||||||
indexToNode[i].prev = null;
|
indexToNode[i].prev = null;
|
||||||
indexToNode[i].container = null;
|
indexToNode[i].resetContents();
|
||||||
}
|
}
|
||||||
currSize = 0;
|
currSize = 0;
|
||||||
nodeToIndex.clear();
|
nodeToIndex.clear();
|
||||||
|
@ -183,27 +208,41 @@ public class LRUDictionary implements Dictionary {
|
||||||
head = null;
|
head = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Node {
|
private static abstract class Node {
|
||||||
byte[] container;
|
|
||||||
int offset;
|
int offset;
|
||||||
int length;
|
int length;
|
||||||
Node next; // link towards the tail
|
Node next; // link towards the tail
|
||||||
Node prev; // link towards the head
|
Node prev; // link towards the head
|
||||||
|
abstract void setContents(byte[] container, int offset, int length);
|
||||||
|
abstract byte[] getContents();
|
||||||
|
abstract void resetContents();
|
||||||
|
}
|
||||||
|
// The actual contents of the LRUDictionary are of ByteArrayBackedNode type
|
||||||
|
private static class ByteArrayBackedNode extends Node {
|
||||||
|
private byte[] container;
|
||||||
|
|
||||||
public Node() {
|
@Override
|
||||||
}
|
void setContents(byte[] container, int offset, int length) {
|
||||||
|
|
||||||
private void setContents(byte[] container, int offset, int length) {
|
|
||||||
this.container = container;
|
this.container = container;
|
||||||
this.offset = offset;
|
this.offset = offset;
|
||||||
this.length = length;
|
this.length = length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
byte[] getContents() {
|
||||||
|
return this.container;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Bytes.hashCode(container, offset, length);
|
return Bytes.hashCode(container, offset, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void resetContents() {
|
||||||
|
this.container = null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object other) {
|
public boolean equals(Object other) {
|
||||||
if (!(other instanceof Node)) {
|
if (!(other instanceof Node)) {
|
||||||
|
@ -211,9 +250,75 @@ public class LRUDictionary implements Dictionary {
|
||||||
}
|
}
|
||||||
|
|
||||||
Node casted = (Node) other;
|
Node casted = (Node) other;
|
||||||
return Bytes.equals(container, offset, length, casted.container,
|
return Bytes.equals(container, offset, length, casted.getContents(),
|
||||||
casted.offset, casted.length);
|
casted.offset, casted.length);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Currently only used for finding the index and hence this node acts
|
||||||
|
// as a temporary holder to look up in the indexToNode map
|
||||||
|
// which is formed by ByteArrayBackedNode
|
||||||
|
private static class ByteBufferBackedNode extends Node {
|
||||||
|
private ByteBuffer container;
|
||||||
|
|
||||||
|
public ByteBufferBackedNode() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void setContents(byte[] container, int offset, int length) {
|
||||||
|
this.container = ByteBuffer.wrap(container);
|
||||||
|
this.offset = offset;
|
||||||
|
this.length = length;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setContents(ByteBuffer container, int offset, int length) {
|
||||||
|
this.container = container;
|
||||||
|
this.offset = offset;
|
||||||
|
this.length = length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void resetContents() {
|
||||||
|
this.container = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
byte[] getContents() {
|
||||||
|
// This ideally should not be called
|
||||||
|
byte[] copy = new byte[this.length];
|
||||||
|
ByteBufferUtils.copyFromBufferToArray(copy, (ByteBuffer) this.container, this.offset, 0,
|
||||||
|
length);
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return ByteBufferUtils.hashCode(container, offset, length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (!(other instanceof Node)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// This was done to avoid findbugs comment
|
||||||
|
Node casted = (Node) other;
|
||||||
|
// The other side should be a byte array backed node only as we add only
|
||||||
|
// ByteArrayBackedNode to the indexToNode map.
|
||||||
|
return ByteBufferUtils.equals(this.container, offset, length,
|
||||||
|
casted.getContents(), casted.offset, casted.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public short findEntry(ByteBuffer data, int offset, int length) {
|
||||||
|
short ret = backingStore.findIdx(data, offset, length);
|
||||||
|
if (ret == NOT_IN_DICTIONARY) {
|
||||||
|
byte[] copy = new byte[length];
|
||||||
|
ByteBufferUtils.copyFromBufferToArray(copy, data, offset, 0, length);
|
||||||
|
addEntryInternal(copy, 0, length, false);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -579,6 +579,22 @@ public final class ByteBufferUtils {
|
||||||
return compareTo(buf1, o1, l1, buf2, o2, l2) == 0;
|
return compareTo(buf1, o1, l1, buf2, o2, l2) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param buf
|
||||||
|
* ByteBuffer to hash
|
||||||
|
* @param offset
|
||||||
|
* offset to start from
|
||||||
|
* @param length
|
||||||
|
* length to hash
|
||||||
|
*/
|
||||||
|
public static int hashCode(ByteBuffer buf, int offset, int length) {
|
||||||
|
int hash = 1;
|
||||||
|
for (int i = offset; i < offset + length; i++) {
|
||||||
|
hash = (31 * hash) + (int) toByte(buf, i);
|
||||||
|
}
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
|
||||||
public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
|
public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
|
||||||
if (UNSAFE_UNALIGNED) {
|
if (UNSAFE_UNALIGNED) {
|
||||||
long offset1Adj, offset2Adj;
|
long offset1Adj, offset2Adj;
|
||||||
|
|
|
@ -21,18 +21,22 @@ package org.apache.hadoop.hbase.io;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.OffheapKeyValue;
|
||||||
import org.apache.hadoop.hbase.Tag;
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||||
|
import org.apache.hadoop.hbase.ByteBufferedCell;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.io.util.LRUDictionary;
|
import org.apache.hadoop.hbase.io.util.LRUDictionary;
|
||||||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -51,12 +55,12 @@ public class TestTagCompressionContext {
|
||||||
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
|
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
|
||||||
KeyValue kv1 = createKVWithTags(2);
|
KeyValue kv1 = createKVWithTags(2);
|
||||||
int tagsLength1 = kv1.getTagsLength();
|
int tagsLength1 = kv1.getTagsLength();
|
||||||
ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
|
ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray());
|
||||||
context.compressTags(baos, ib, tagsLength1);
|
context.compressTags(baos, ib, kv1.getTagsOffset(), tagsLength1);
|
||||||
KeyValue kv2 = createKVWithTags(3);
|
KeyValue kv2 = createKVWithTags(3);
|
||||||
int tagsLength2 = kv2.getTagsLength();
|
int tagsLength2 = kv2.getTagsLength();
|
||||||
ib = ByteBuffer.wrap(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
|
ib = ByteBuffer.wrap(kv2.getTagsArray());
|
||||||
context.compressTags(baos, ib, tagsLength2);
|
context.compressTags(baos, ib, kv2.getTagsOffset(), tagsLength2);
|
||||||
|
|
||||||
context.clear();
|
context.clear();
|
||||||
|
|
||||||
|
@ -71,6 +75,31 @@ public class TestTagCompressionContext {
|
||||||
tagsLength2));
|
tagsLength2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception {
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
|
||||||
|
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
|
||||||
|
ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2);
|
||||||
|
int tagsLength1 = kv1.getTagsLength();
|
||||||
|
context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1);
|
||||||
|
ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3);
|
||||||
|
int tagsLength2 = kv2.getTagsLength();
|
||||||
|
context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2);
|
||||||
|
|
||||||
|
context.clear();
|
||||||
|
|
||||||
|
byte[] dest = new byte[tagsLength1];
|
||||||
|
ByteBuffer ob = ByteBuffer.wrap(baos.getBuffer());
|
||||||
|
context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1);
|
||||||
|
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
|
||||||
|
tagsLength1));
|
||||||
|
dest = new byte[tagsLength2];
|
||||||
|
context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2);
|
||||||
|
assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
|
||||||
|
tagsLength2));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCompressUncompressTags2() throws Exception {
|
public void testCompressUncompressTags2() throws Exception {
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
@ -84,7 +113,32 @@ public class TestTagCompressionContext {
|
||||||
|
|
||||||
context.clear();
|
context.clear();
|
||||||
|
|
||||||
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
|
ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer());
|
||||||
|
byte[] dest = new byte[tagsLength1];
|
||||||
|
context.uncompressTags(bais, dest, 0, tagsLength1);
|
||||||
|
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
|
||||||
|
tagsLength1));
|
||||||
|
dest = new byte[tagsLength2];
|
||||||
|
context.uncompressTags(bais, dest, 0, tagsLength2);
|
||||||
|
assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
|
||||||
|
tagsLength2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception {
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
|
||||||
|
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
|
||||||
|
ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1);
|
||||||
|
int tagsLength1 = kv1.getTagsLength();
|
||||||
|
context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1);
|
||||||
|
ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3);
|
||||||
|
int tagsLength2 = kv2.getTagsLength();
|
||||||
|
context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2);
|
||||||
|
|
||||||
|
context.clear();
|
||||||
|
|
||||||
|
ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer());
|
||||||
byte[] dest = new byte[tagsLength1];
|
byte[] dest = new byte[tagsLength1];
|
||||||
context.uncompressTags(bais, dest, 0, tagsLength1);
|
context.uncompressTags(bais, dest, 0, tagsLength1);
|
||||||
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
|
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
|
||||||
|
@ -103,4 +157,16 @@ public class TestTagCompressionContext {
|
||||||
KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
|
KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
|
||||||
return kv;
|
return kv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Cell createOffheapKVWithTags(int noOfTags) {
|
||||||
|
List<Tag> tags = new ArrayList<Tag>();
|
||||||
|
for (int i = 0; i < noOfTags; i++) {
|
||||||
|
tags.add(new ArrayBackedTag((byte) i, "tagValue" + i));
|
||||||
|
}
|
||||||
|
KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
|
||||||
|
ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
|
||||||
|
ByteBufferUtils.copyFromArrayToBuffer(dbb, kv.getBuffer(), 0, kv.getBuffer().length);
|
||||||
|
OffheapKeyValue offheapKV = new OffheapKeyValue(dbb, 0, kv.getBuffer().length, true, 0);
|
||||||
|
return offheapKV;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue