HBASE-11437 Modify cell tag handling code to treat the length as unsigned. (Anoop)

This commit is contained in:
anoopsjohn 2014-07-13 12:10:21 +05:30
parent f5e13c7460
commit 213e565bce
23 changed files with 293 additions and 222 deletions

View File

@ -171,7 +171,7 @@ public class TestPayloadCarryingRpcController {
}
@Override
public short getTagsLength() {
public int getTagsLength() {
// unused
return 0;
}

View File

@ -190,11 +190,11 @@ public interface Cell {
* @return the first offset where the tags start in the Cell
*/
int getTagsOffset();
/**
* @return the total length of the tags in the Cell.
*/
short getTagsLength();
int getTagsLength();
/**
* WARNING do not use, expensive. This gets an arraycopy of the cell's value.

View File

@ -489,8 +489,8 @@ public final class CellUtil {
@Override
public Tag next() {
if (hasNext()) {
short curTagLen = Bytes.toShort(tags, this.pos);
Tag tag = new Tag(tags, pos, (short) (curTagLen + Bytes.SIZEOF_SHORT));
int curTagLen = Bytes.readAsInt(tags, this.pos, Tag.TAG_LENGTH_SIZE);
Tag tag = new Tag(tags, pos, curTagLen + Tag.TAG_LENGTH_SIZE);
this.pos += Bytes.SIZEOF_SHORT + curTagLen;
return tag;
}

View File

@ -143,6 +143,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
public static final int KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE = ROW_OFFSET + TAGS_LENGTH_SIZE;
private static final int MAX_TAGS_LENGTH = (2 * Short.MAX_VALUE) + 1;
/**
* Computes the number of bytes that a <code>KeyValue</code> instance with the provided
@ -798,7 +799,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
pos = Bytes.putByte(bytes, pos, type.getCode());
pos += vlength;
if (tagsLength > 0) {
pos = Bytes.putShort(bytes, pos, (short)(tagsLength & 0x0000ffff));
pos = Bytes.putAsShort(bytes, pos, tagsLength);
}
return bytes;
}
@ -916,7 +917,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
}
// Write the number of tags. If it is 0 then it means there are no tags.
if (tagsLength > 0) {
pos = Bytes.putShort(buffer, pos, (short) tagsLength);
pos = Bytes.putAsShort(buffer, pos, tagsLength);
for (Tag t : tags) {
pos = Bytes.putBytes(buffer, pos, t.getBuffer(), t.getOffset(), t.getLength());
}
@ -925,8 +926,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
}
private static void checkForTagsLength(int tagsLength) {
if (tagsLength > Short.MAX_VALUE) {
throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + Short.MAX_VALUE);
if (tagsLength > MAX_TAGS_LENGTH) {
throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + MAX_TAGS_LENGTH);
}
}
@ -981,7 +982,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
}
// Add the tags after the value part
if (tagsLength > 0) {
pos = Bytes.putShort(bytes, pos, (short) (tagsLength));
pos = Bytes.putAsShort(bytes, pos, tagsLength);
pos = Bytes.putBytes(bytes, pos, tags, tagsOffset, tagsLength);
}
return bytes;
@ -1041,7 +1042,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
}
// Add the tags after the value part
if (tagsLength > 0) {
pos = Bytes.putShort(bytes, pos, (short) (tagsLength));
pos = Bytes.putAsShort(bytes, pos, tagsLength);
for (Tag t : tags) {
pos = Bytes.putBytes(bytes, pos, t.getBuffer(), t.getOffset(), t.getLength());
}
@ -1554,7 +1555,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
*/
@Override
public int getTagsOffset() {
short tagsLen = getTagsLength();
int tagsLen = getTagsLength();
if (tagsLen == 0) {
return this.offset + this.length;
}
@ -1565,14 +1566,14 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
* This returns the total length of the tag bytes
*/
@Override
public short getTagsLength() {
public int getTagsLength() {
int tagsLen = this.length - (getKeyLength() + getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE);
if (tagsLen > 0) {
// There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags
// length
tagsLen -= TAGS_LENGTH_SIZE;
}
return (short) tagsLen;
return tagsLen;
}
/**
@ -1580,7 +1581,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
* @return The tags
*/
public List<Tag> getTags() {
short tagsLength = getTagsLength();
int tagsLength = getTagsLength();
if (tagsLength == 0) {
return EMPTY_ARRAY_LIST;
}
@ -2690,7 +2691,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
@Override
public int getTagsOffset() {
return (short) 0;
return 0;
}
@Override
@ -2709,8 +2710,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
}
@Override
public short getTagsLength() {
return (short) 0;
public int getTagsLength() {
return 0;
}
@Override

View File

@ -117,7 +117,7 @@ public class KeyValueUtil {
pos = appendKeyToByteArrayWithoutValue(cell, output, pos);
pos = CellUtil.copyValueTo(cell, output, pos);
if ((cell.getTagsLength() > 0)) {
pos = Bytes.putShort(output, pos, cell.getTagsLength());
pos = Bytes.putAsShort(output, pos, cell.getTagsLength());
pos = CellUtil.copyTagTo(cell, output, pos);
}
return pos;
@ -166,9 +166,10 @@ public class KeyValueUtil {
int keyLength = bb.getInt();
int valueLength = bb.getInt();
ByteBufferUtils.skip(bb, keyLength + valueLength);
short tagsLength = 0;
int tagsLength = 0;
if (includesTags) {
tagsLength = bb.getShort();
// Read short as unsigned, high byte first
tagsLength = ((bb.get() & 0xff) << 8) ^ (bb.get() & 0xff);
ByteBufferUtils.skip(bb, tagsLength);
}
int kvLength = (int) KeyValue.getKeyValueDataStructureSize(keyLength, valueLength, tagsLength);

View File

@ -35,11 +35,12 @@ public class Tag {
public final static int TYPE_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
public final static int TAG_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
public final static int INFRASTRUCTURE_SIZE = TYPE_LENGTH_SIZE + TAG_LENGTH_SIZE;
private static final int MAX_TAG_LENGTH = (2 * Short.MAX_VALUE) + 1 - TAG_LENGTH_SIZE;
private final byte type;
private final byte[] bytes;
private int offset = 0;
private short length = 0;
private int length = 0;
// The special tag will write the length of each tag and that will be
// followed by the type and then the actual tag.
@ -54,13 +55,19 @@ public class Tag {
* @param tag
*/
public Tag(byte tagType, byte[] tag) {
/** <length of tag - 2 bytes><type code - 1 byte><tag>
* taglength maximum is Short.MAX_SIZE. It includes 1 byte type length and actual tag bytes length.
/**
* Format for a tag : <length of tag - 2 bytes><type code - 1 byte><tag> taglength is serialized
* using 2 bytes only but as this will be unsigned, we can have max taglength of
* (Short.MAX_SIZE * 2) +1. It includes 1 byte type length and actual tag bytes length.
*/
short tagLength = (short) ((tag.length & 0x0000ffff) + TYPE_LENGTH_SIZE);
length = (short) (TAG_LENGTH_SIZE + tagLength);
int tagLength = tag.length + TYPE_LENGTH_SIZE;
if (tagLength > MAX_TAG_LENGTH) {
throw new IllegalArgumentException(
"Invalid tag data being passed. Its length can not exceed " + MAX_TAG_LENGTH);
}
length = TAG_LENGTH_SIZE + tagLength;
bytes = new byte[length];
int pos = Bytes.putShort(bytes, 0, tagLength);
int pos = Bytes.putAsShort(bytes, 0, tagLength);
pos = Bytes.putByte(bytes, pos, tagType);
Bytes.putBytes(bytes, pos, tag, 0, tag.length);
this.type = tagType;
@ -80,14 +87,14 @@ public class Tag {
this(bytes, offset, getLength(bytes, offset));
}
private static short getLength(byte[] bytes, int offset) {
return (short) (TAG_LENGTH_SIZE + Bytes.toShort(bytes, offset));
private static int getLength(byte[] bytes, int offset) {
return TAG_LENGTH_SIZE + Bytes.readAsInt(bytes, offset, TAG_LENGTH_SIZE);
}
/**
* Creates a Tag from the specified byte array, starting at offset, and for
* length <code>length</code>. Presumes <code>bytes</code> content starting at
* <code>offset</code> is formatted as a Tag blob.
* Creates a Tag from the specified byte array, starting at offset, and for length
* <code>length</code>. Presumes <code>bytes</code> content starting at <code>offset</code> is
* formatted as a Tag blob.
* @param bytes
* byte array
* @param offset
@ -95,7 +102,11 @@ public class Tag {
* @param length
* length of the Tag
*/
public Tag(byte[] bytes, int offset, short length) {
public Tag(byte[] bytes, int offset, int length) {
if (length > MAX_TAG_LENGTH) {
throw new IllegalArgumentException(
"Invalid tag data being passed. Its length can not exceed " + MAX_TAG_LENGTH);
}
this.bytes = bytes;
this.offset = offset;
this.length = length;
@ -156,8 +167,8 @@ public class Tag {
List<Tag> tags = new ArrayList<Tag>();
int pos = offset;
while (pos < offset + length) {
short tagLen = Bytes.toShort(b, pos);
tags.add(new Tag(b, pos, (short) (tagLen + TAG_LENGTH_SIZE)));
int tagLen = Bytes.readAsInt(b, pos, TAG_LENGTH_SIZE);
tags.add(new Tag(b, pos, tagLen + TAG_LENGTH_SIZE));
pos += TAG_LENGTH_SIZE + tagLen;
}
return tags;
@ -174,9 +185,9 @@ public class Tag {
public static Tag getTag(byte[] b, int offset, int length, byte type) {
int pos = offset;
while (pos < offset + length) {
short tagLen = Bytes.toShort(b, pos);
int tagLen = Bytes.readAsInt(b, pos, TAG_LENGTH_SIZE);
if(b[pos + TAG_LENGTH_SIZE] == type) {
return new Tag(b, pos, (short) (tagLen + TAG_LENGTH_SIZE));
return new Tag(b, pos, tagLen + TAG_LENGTH_SIZE);
}
pos += TAG_LENGTH_SIZE + tagLen;
}
@ -186,7 +197,7 @@ public class Tag {
/**
* Returns the total length of the entire tag entity
*/
short getLength() {
int getLength() {
return this.length;
}

View File

@ -61,13 +61,13 @@ public class TagCompressionContext {
* @param length Length of all tag bytes
* @throws IOException
*/
public void compressTags(OutputStream out, byte[] in, int offset, short length)
public void compressTags(OutputStream out, byte[] in, int offset, int length)
throws IOException {
int pos = offset;
int endOffset = pos + length;
assert pos < endOffset;
while (pos < endOffset) {
short tagLen = Bytes.toShort(in, pos);
int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
pos += Tag.TAG_LENGTH_SIZE;
write(in, pos, tagLen, out);
pos += tagLen;
@ -81,7 +81,7 @@ public class TagCompressionContext {
* @param length Length of all tag bytes
* @throws IOException
*/
public void compressTags(OutputStream out, ByteBuffer in, short length) throws IOException {
public void compressTags(OutputStream out, ByteBuffer in, int length) throws IOException {
if (in.hasArray()) {
compressTags(out, in.array(), in.arrayOffset() + in.position(), length);
ByteBufferUtils.skip(in, length);
@ -100,15 +100,14 @@ public class TagCompressionContext {
* @param length Length of all tag bytes
* @throws IOException
*/
public void uncompressTags(InputStream src, byte[] dest, int offset, short length)
public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
throws IOException {
int endOffset = offset + length;
while (offset < endOffset) {
byte status = (byte) src.read();
if (status == Dictionary.NOT_IN_DICTIONARY) {
// We are writing short as tagLen. So can downcast this without any risk.
short tagLen = (short) StreamUtils.readRawVarint32(src);
offset = Bytes.putShort(dest, offset, tagLen);
int tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
IOUtils.readFully(src, dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
offset += tagLen;
@ -118,7 +117,7 @@ public class TagCompressionContext {
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
}
offset = Bytes.putShort(dest, offset, (short) entry.length);
offset = Bytes.putAsShort(dest, offset, entry.length);
System.arraycopy(entry, 0, dest, offset, entry.length);
offset += entry.length;
}
@ -140,11 +139,10 @@ public class TagCompressionContext {
int endOffset = offset + length;
while (offset < endOffset) {
byte status = src.get();
short tagLen;
int tagLen;
if (status == Dictionary.NOT_IN_DICTIONARY) {
// We are writing short as tagLen. So can downcast this without any risk.
tagLen = (short) StreamUtils.readRawVarint32(src);
offset = Bytes.putShort(dest, offset, tagLen);
tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
src.get(dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
offset += tagLen;
@ -154,8 +152,8 @@ public class TagCompressionContext {
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
}
tagLen = (short) entry.length;
offset = Bytes.putShort(dest, offset, tagLen);
tagLen = entry.length;
offset = Bytes.putAsShort(dest, offset, tagLen);
System.arraycopy(entry, 0, dest, offset, tagLen);
offset += tagLen;
}
@ -170,7 +168,7 @@ public class TagCompressionContext {
* @param length Length of all tag bytes
* @throws IOException
*/
public void uncompressTags(InputStream src, ByteBuffer dest, short length) throws IOException {
public void uncompressTags(InputStream src, ByteBuffer dest, int length) throws IOException {
if (dest.hasArray()) {
uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), length);
} else {
@ -180,7 +178,7 @@ public class TagCompressionContext {
}
}
private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
private void write(byte[] data, int offset, int length, OutputStream out) throws IOException {
short dictIdx = Dictionary.NOT_IN_DICTIONARY;
if (tagDict != null) {
dictIdx = tagDict.findEntry(data, offset, length);

View File

@ -268,8 +268,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
@Override
public short getTagsLength() {
return (short) tagsLength;
public int getTagsLength() {
return tagsLength;
}
@Override
@ -312,7 +312,6 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
}
}
/**
@ -463,8 +462,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
@Override
public short getTagsLength() {
return (short) tagsLength;
public int getTagsLength() {
return tagsLength;
}
@Override
@ -585,7 +584,9 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
currentBuffer.arrayOffset() + current.valueOffset,
current.valueLength);
if (current.tagsLength > 0) {
kvBuffer.putShort((short) current.tagsLength);
// Put short as unsigned
kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
kvBuffer.put((byte) (current.tagsLength & 0xff));
if (current.tagsOffset != -1) {
// the offset of the tags bytes in the underlying buffer is marked. So the temp
// buffer,tagsBuffer was not been used.
@ -833,7 +834,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
int size = 0;
if (encodingCtx.getHFileContext().isIncludesTags()) {
short tagsLength = kv.getTagsLength();
int tagsLength = kv.getTagsLength();
ByteBufferUtils.putCompressedInt(out, tagsLength);
// There are some tags to be written
if (tagsLength > 0) {
@ -863,8 +864,10 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
protected final void afterDecodingKeyValue(DataInputStream source,
ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
if (decodingCtx.getHFileContext().isIncludesTags()) {
short tagsLength = (short) ByteBufferUtils.readCompressedInt(source);
dest.putShort(tagsLength);
int tagsLength = ByteBufferUtils.readCompressedInt(source);
// Put as unsigned short
dest.put((byte) ((tagsLength >> 8) & 0xff));
dest.put((byte) (tagsLength & 0xff));
if (tagsLength > 0) {
TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
// When tag compression is been used in this file, tagCompressionContext will have a not

View File

@ -48,7 +48,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
// Write the additional tag into the stream
if (encodingContext.getHFileContext().isIncludesTags()) {
short tagsLength = kv.getTagsLength();
int tagsLength = kv.getTagsLength();
out.writeShort(tagsLength);
if (tagsLength > 0) {
out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
@ -88,7 +88,8 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
current.valueOffset = currentBuffer.position();
ByteBufferUtils.skip(currentBuffer, current.valueLength);
if (includesTags()) {
current.tagsLength = currentBuffer.getShort();
// Read short as unsigned, high byte first
current.tagsLength = ((currentBuffer.get() & 0xff) << 8) ^ (currentBuffer.get() & 0xff);
ByteBufferUtils.skip(currentBuffer, current.tagsLength);
}
if (includesMvcc()) {

View File

@ -114,11 +114,11 @@ public class EncodedDataBlock {
int offset = decompressedData.position();
int klen = decompressedData.getInt();
int vlen = decompressedData.getInt();
short tagsLen = 0;
int tagsLen = 0;
ByteBufferUtils.skip(decompressedData, klen + vlen);
// Read the tag length in case when steam contain tags
if (meta.isIncludesTags()) {
tagsLen = decompressedData.getShort();
tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff);
ByteBufferUtils.skip(decompressedData, tagsLen);
}
KeyValue kv = new KeyValue(decompressedData.array(), offset,
@ -227,7 +227,7 @@ public class EncodedDataBlock {
ByteBuffer in = getUncompressedBuffer();
in.rewind();
int klength, vlength;
short tagsLength = 0;
int tagsLength = 0;
long memstoreTS = 0L;
KeyValue kv = null;
while (in.hasRemaining()) {
@ -236,7 +236,7 @@ public class EncodedDataBlock {
vlength = in.getInt();
ByteBufferUtils.skip(in, klength + vlength);
if (this.meta.isIncludesTags()) {
tagsLength = in.getShort();
tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff);
ByteBufferUtils.skip(in, tagsLength);
}
if (this.meta.isIncludesMvcc()) {

View File

@ -758,6 +758,28 @@ public class Bytes {
return n;
}
/**
* Converts a byte array to an int value
* @param bytes byte array
* @param offset offset into array
* @param length how many bytes should be considered for creating int
* @return the int value
* @throws IllegalArgumentException if there's not enough room in the array at the offset
* indicated.
*/
public static int readAsInt(byte[] bytes, int offset, final int length) {
if (offset + length > bytes.length) {
throw new IllegalArgumentException("offset (" + offset + ") + length (" + length
+ ") exceed the" + " capacity of the array: " + bytes.length);
}
int n = 0;
for(int i = offset; i < (offset + length); i++) {
n <<= 8;
n ^= bytes[i] & 0xFF;
}
return n;
}
/**
* Put an int value out to the specified byte array position.
* @param bytes the byte array
@ -865,6 +887,29 @@ public class Bytes {
return offset + SIZEOF_SHORT;
}
/**
* Put an int value as short out to the specified byte array position. Only the lower 2 bytes of
* the short will be put into the array. The caller of the API need to make sure they will not
* loose the value by doing so. This is useful to store an unsigned short which is represented as
* int in other parts.
* @param bytes the byte array
* @param offset position in the array
* @param val value to write out
* @return incremented offset
* @throws IllegalArgumentException if the byte array given doesn't have
* enough room at the offset specified.
*/
public static int putAsShort(byte[] bytes, int offset, int val) {
if (bytes.length - offset < SIZEOF_SHORT) {
throw new IllegalArgumentException("Not enough room to put a short at"
+ " offset " + offset + " in a " + bytes.length + " byte array");
}
bytes[offset+1] = (byte) val;
val >>= 8;
bytes[offset] = (byte) val;
return offset + SIZEOF_SHORT;
}
/**
* Convert a BigDecimal value to a byte array
*

View File

@ -575,7 +575,8 @@ public class TestKeyValue extends TestCase {
Bytes.equals(next.getValue(), metaValue2);
assertFalse(tagItr.hasNext());
tagItr = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
tagItr = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(),
kv.getTagsLength());
assertTrue(tagItr.hasNext());
next = tagItr.next();
assertEquals(10, next.getTagLength());

View File

@ -47,11 +47,11 @@ public class TestTagCompressionContext {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
KeyValue kv1 = createKVWithTags(2);
short tagsLength1 = kv1.getTagsLength();
int tagsLength1 = kv1.getTagsLength();
ByteBuffer ib = ByteBuffer.wrap(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
context.compressTags(baos, ib, tagsLength1);
KeyValue kv2 = createKVWithTags(3);
short tagsLength2 = kv2.getTagsLength();
int tagsLength2 = kv2.getTagsLength();
ib = ByteBuffer.wrap(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
context.compressTags(baos, ib, tagsLength2);
@ -73,10 +73,10 @@ public class TestTagCompressionContext {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
KeyValue kv1 = createKVWithTags(1);
short tagsLength1 = kv1.getTagsLength();
int tagsLength1 = kv1.getTagsLength();
context.compressTags(baos, kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1);
KeyValue kv2 = createKVWithTags(3);
short tagsLength2 = kv2.getTagsLength();
int tagsLength2 = kv2.getTagsLength();
context.compressTags(baos, kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2);
context.clear();

View File

@ -35,8 +35,10 @@ public class TestByteRangeWithKVSerialization {
pbr.putInt(kv.getValueLength());
pbr.put(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
pbr.put(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
pbr.putShort(kv.getTagsLength());
pbr.put(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
int tagsLen = kv.getTagsLength();
pbr.put((byte) (tagsLen >> 8 & 0xff));
pbr.put((byte) (tagsLen & 0xff));
pbr.put(kv.getTagsArray(), kv.getTagsOffset(), tagsLen);
pbr.putVLong(kv.getMvccVersion());
}
@ -45,7 +47,7 @@ public class TestByteRangeWithKVSerialization {
int keyLen = pbr.getInt();
int valLen = pbr.getInt();
pbr.setPosition(pbr.getPosition() + keyLen + valLen); // Skip the key and value section
short tagsLen = pbr.getShort();
int tagsLen = ((pbr.get() & 0xff) << 8) ^ (pbr.get() & 0xff);
pbr.setPosition(pbr.getPosition() + tagsLen); // Skip the tags section
long mvcc = pbr.getVLong();
KeyValue kv = new KeyValue(pbr.getBytes(), kvStartPos,
@ -82,8 +84,9 @@ public class TestByteRangeWithKVSerialization {
Assert.assertTrue(kv.equals(kv1));
Assert.assertTrue(Bytes.equals(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
kv1.getValueArray(), kv1.getValueOffset(), kv1.getValueLength()));
Assert.assertTrue(Bytes.equals(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength(),
kv1.getTagsArray(), kv1.getTagsOffset(), kv1.getTagsLength()));
Assert.assertTrue(Bytes.equals(kv.getTagsArray(), kv.getTagsOffset(),
kv.getTagsLength(), kv1.getTagsArray(), kv1.getTagsOffset(),
kv1.getTagsLength()));
Assert.assertEquals(kv1.getMvccVersion(), kv.getMvccVersion());
}
}

View File

@ -463,7 +463,7 @@ public class PrefixTreeArrayScanner extends PrefixTreeCell implements CellScanne
protected void populateTag() {
int tagTreeIndex = currentRowNode.getTagOffset(currentCellIndex, blockMeta);
tagsOffset = tagsReader.populateBuffer(tagTreeIndex).getColumnOffset();
tagsLength = (short)tagsReader.getColumnLength();
tagsLength = tagsReader.getColumnLength();
}
protected void populateTimestamp() {

View File

@ -72,7 +72,7 @@ public class PrefixTreeCell implements Cell, Comparable<Cell> {
protected byte[] tagsBuffer;
protected int tagsOffset;
protected short tagsLength;
protected int tagsLength;
/********************** Cell methods ******************/
@ -229,7 +229,7 @@ public class PrefixTreeCell implements Cell, Comparable<Cell> {
}
@Override
public short getTagsLength() {
public int getTagsLength() {
return tagsLength;
}
@ -237,5 +237,4 @@ public class PrefixTreeCell implements Cell, Comparable<Cell> {
public byte[] getTagsArray() {
return this.tagsBuffer;
}
}

View File

@ -216,7 +216,8 @@ public class HFileReaderV3 extends HFileReaderV2 {
}
ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
if (reader.hfileContext.isIncludesTags()) {
currTagsLen = blockBuffer.getShort();
// Read short as unsigned, high byte first
currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
@ -263,7 +264,8 @@ public class HFileReaderV3 extends HFileReaderV2 {
}
ByteBufferUtils.skip(blockBuffer, klen + vlen);
if (reader.hfileContext.isIncludesTags()) {
tlen = blockBuffer.getShort();
// Read short as unsigned, high byte first
tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
if (tlen < 0 || tlen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "

View File

@ -85,7 +85,7 @@ public class HFileWriterV3 extends HFileWriterV2 {
public void append(final KeyValue kv) throws IOException {
// Currently get the complete arrays
super.append(kv);
short tagsLength = kv.getTagsLength();
int tagsLength = kv.getTagsLength();
if (tagsLength > this.maxTagsLength) {
this.maxTagsLength = tagsLength;
}
@ -126,7 +126,7 @@ public class HFileWriterV3 extends HFileWriterV2 {
pos = Bytes.putBytes(b, pos, key, 0, key.length);
pos = Bytes.putBytes(b, pos, value, 0, value.length);
if (tag.length > 0) {
pos = Bytes.putShort(b, pos, (short) tag.length);
pos = Bytes.putAsShort(b, pos, tag.length);
Bytes.putBytes(b, pos, tag, 0, tag.length);
}
append(new KeyValue(b, 0, kvlen));

View File

@ -54,7 +54,7 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
int encodedKvSize = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
// Write the additional tag into the stream
if (encodingCtx.getHFileContext().isIncludesTags()) {
short tagsLength = kv.getTagsLength();
int tagsLength = kv.getTagsLength();
out.writeShort(tagsLength);
if (tagsLength > 0) {
out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);

View File

@ -170,7 +170,7 @@ public class WALCellCodec implements Codec {
StreamUtils.writeRawVInt32(out, kv.getKeyLength());
StreamUtils.writeRawVInt32(out, kv.getValueLength());
// To support tags
short tagsLength = kv.getTagsLength();
int tagsLength = kv.getTagsLength();
StreamUtils.writeRawVInt32(out, tagsLength);
// Write row, qualifier, and family; use dictionary
@ -227,7 +227,7 @@ public class WALCellCodec implements Codec {
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);
short tagsLength = (short) StreamUtils.readRawVarint32(in);
int tagsLength = StreamUtils.readRawVarint32(in);
int length = 0;
if(tagsLength == 0) {
length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
@ -266,7 +266,7 @@ public class WALCellCodec implements Codec {
// tags
if (tagsLength > 0) {
pos = Bytes.putShort(backingArray, pos, tagsLength);
pos = Bytes.putAsShort(backingArray, pos, tagsLength);
if (compression.tagCompressionContext != null) {
compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
} else {

View File

@ -180,8 +180,8 @@ public class VisibilityUtils {
cell.getTagsLength());
while (tagsIterator.hasNext()) {
Tag tag = tagsIterator.next();
if(tag.getType() == VisibilityUtils.VISIBILITY_EXP_SERIALIZATION_TAG_TYPE) {
int serializationVersion = Bytes.toShort(tag.getValue());
if (tag.getType() == VisibilityUtils.VISIBILITY_EXP_SERIALIZATION_TAG_TYPE) {
int serializationVersion = Bytes.toShort(tag.getBuffer());
if (serializationVersion == VisibilityConstants.VISIBILITY_SERIALIZATION_VERSION) {
sortedOrder = true;
continue;

View File

@ -237,7 +237,7 @@ public class TestHFileWriterV3 {
buf.get(value);
byte[] tagValue = null;
if (useTags) {
int tagLen = buf.getShort();
int tagLen = ((buf.get() & 0xff) << 8) ^ (buf.get() & 0xff);
tagValue = new byte[tagLen];
buf.get(tagValue);
}

View File

@ -27,6 +27,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -63,7 +64,7 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Class that test tags
* Class that test tags
*/
@Category(MediumTests.class)
public class TestTags {
@ -123,9 +124,9 @@ public class TestTags {
table.put(put);
admin.flush(tableName.getName());
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
for (HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 0)) {
while (!(store.getStorefilesCount() > 0)) {
Thread.sleep(10);
}
}
@ -137,9 +138,9 @@ public class TestTags {
table.put(put1);
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
for (HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 1)) {
while (!(store.getStorefilesCount() > 1)) {
Thread.sleep(10);
}
}
@ -152,15 +153,15 @@ public class TestTags {
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
for (HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 2)) {
while (!(store.getStorefilesCount() > 2)) {
Thread.sleep(10);
}
}
result(fam, row, qual, row2, table, value, value2, row1, value1);
admin.compact(tableName.getName());
while(admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
Thread.sleep(10);
}
result(fam, row, qual, row2, table, value, value2, row1, value1);
@ -201,9 +202,9 @@ public class TestTags {
table.put(put);
admin.flush(tableName.getName());
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
for (HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 0)) {
while (!(store.getStorefilesCount() > 0)) {
Thread.sleep(10);
}
}
@ -214,9 +215,9 @@ public class TestTags {
table.put(put1);
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
for (HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 1)) {
while (!(store.getStorefilesCount() > 1)) {
Thread.sleep(10);
}
}
@ -228,9 +229,9 @@ public class TestTags {
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
for (HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 2)) {
while (!(store.getStorefilesCount() > 2)) {
Thread.sleep(10);
}
}
@ -240,7 +241,7 @@ public class TestTags {
Result[] next = scanner.next(3);
for (Result result : next) {
CellScanner cellScanner = result.cellScanner();
boolean advance = cellScanner.advance();
cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
}
@ -249,7 +250,7 @@ public class TestTags {
scanner.close();
}
admin.compact(tableName.getName());
while(admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
Thread.sleep(10);
}
s = new Scan(row);
@ -258,7 +259,7 @@ public class TestTags {
Result[] next = scanner.next(3);
for (Result result : next) {
CellScanner cellScanner = result.cellScanner();
boolean advance = cellScanner.advance();
cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
}
@ -273,128 +274,135 @@ public class TestTags {
}
}
}
@Test
public void testFlushAndCompactionwithCombinations() throws Exception {
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
byte[] qual = Bytes.toBytes("qual");
byte[] row1 = Bytes.toBytes("rowb");
byte[] row2 = Bytes.toBytes("rowc");
byte[] rowd = Bytes.toBytes("rowd");
byte[] rowe = Bytes.toBytes("rowe");
HTable table = null;
try {
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
byte[] qual = Bytes.toBytes("qual");
byte[] row1 = Bytes.toBytes("rowb");
byte[] row2 = Bytes.toBytes("rowc");
byte[] rowd = Bytes.toBytes("rowd");
byte[] rowe = Bytes.toBytes("rowe");
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor colDesc = new HColumnDescriptor(fam);
colDesc.setBlockCacheEnabled(true);
// colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
colDesc.setDataBlockEncoding(encoding);
desc.addFamily(colDesc);
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
admin.createTable(desc);
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put = new Put(row);
byte[] value = Bytes.toBytes("value");
put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
put.setAttribute("visibility", Bytes.toBytes("ram"));
table.put(put);
Put put1 = new Put(row1);
byte[] value1 = Bytes.toBytes("1000dfsdf");
put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
table.put(put1);
admin.flush(tableName.getName());
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 0)) {
Thread.sleep(10);
}
}
put1 = new Put(row2);
value1 = Bytes.toBytes("1000dfsdf");
put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
table.put(put1);
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 1)) {
Thread.sleep(10);
}
}
Put put2 = new Put(rowd);
byte[] value2 = Bytes.toBytes("1000dfsdf");
put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
table.put(put2);
put2 = new Put(rowe);
value2 = Bytes.toBytes("1000dfsddfdf");
put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
put.setAttribute("visibility", Bytes.toBytes("ram"));
table.put(put2);
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for(HRegion region : regions) {
Store store = region.getStore(fam);
while(!(store.getStorefilesCount() > 2)) {
Thread.sleep(10);
}
}
Scan s = new Scan(row);
ResultScanner scanner = table.getScanner(s);
try {
Result[] next = scanner.next(5);
for (Result result : next) {
CellScanner cellScanner = result.cellScanner();
boolean advance = cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
// System.out.println(current);
int tagsLength = current.getTagsLength();
if (tagsLength == 0) {
assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
} else {
// even if taglength is going to be > 0 the byte array would be same
assertTrue(current.getValueOffset() + current.getValueLength() != current.getLength());
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put = new Put(row);
byte[] value = Bytes.toBytes("value");
put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
int bigTagLen = Short.MAX_VALUE + 5;
put.setAttribute("visibility", new byte[bigTagLen]);
table.put(put);
Put put1 = new Put(row1);
byte[] value1 = Bytes.toBytes("1000dfsdf");
put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
table.put(put1);
admin.flush(tableName.getName());
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for (HRegion region : regions) {
Store store = region.getStore(fam);
while (!(store.getStorefilesCount() > 0)) {
Thread.sleep(10);
}
}
} finally {
if (scanner != null) {
scanner.close();
}
}
while(admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
Thread.sleep(10);
}
s = new Scan(row);
scanner = table.getScanner(s);
try {
Result[] next = scanner.next(5);
for (Result result : next) {
CellScanner cellScanner = result.cellScanner();
boolean advance = cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
// System.out.println(current);
if (current.getTagsLength() == 0) {
assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
} else {
// even if taglength is going to be > 0 the byte array would be same
assertTrue(current.getValueOffset() + current.getValueLength() != current.getLength());
put1 = new Put(row2);
value1 = Bytes.toBytes("1000dfsdf");
put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
table.put(put1);
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for (HRegion region : regions) {
Store store = region.getStore(fam);
while (!(store.getStorefilesCount() > 1)) {
Thread.sleep(10);
}
}
} finally {
if (scanner != null) {
scanner.close();
Put put2 = new Put(rowd);
byte[] value2 = Bytes.toBytes("1000dfsdf");
put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
table.put(put2);
put2 = new Put(rowe);
value2 = Bytes.toBytes("1000dfsddfdf");
put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
put.setAttribute("visibility", Bytes.toBytes("ram"));
table.put(put2);
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
for (HRegion region : regions) {
Store store = region.getStore(fam);
while (!(store.getStorefilesCount() > 2)) {
Thread.sleep(10);
}
}
}
} finally {
if (table != null) {
table.close();
TestCoprocessorForTags.checkTagPresence = true;
Scan s = new Scan(row);
s.setCaching(1);
ResultScanner scanner = table.getScanner(s);
try {
Result next = null;
while ((next = scanner.next()) != null) {
CellScanner cellScanner = next.cellScanner();
cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
if (CellUtil.matchingRow(current, row)) {
assertEquals(1, TestCoprocessorForTags.tags.size());
Tag tag = TestCoprocessorForTags.tags.get(0);
assertEquals(bigTagLen, tag.getTagLength());
} else {
assertEquals(0, TestCoprocessorForTags.tags.size());
}
}
} finally {
if (scanner != null) {
scanner.close();
}
TestCoprocessorForTags.checkTagPresence = false;
}
while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
Thread.sleep(10);
}
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(s);
try {
Result next = null;
while ((next = scanner.next()) != null) {
CellScanner cellScanner = next.cellScanner();
cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
if (CellUtil.matchingRow(current, row)) {
assertEquals(1, TestCoprocessorForTags.tags.size());
Tag tag = TestCoprocessorForTags.tags.get(0);
assertEquals(bigTagLen, tag.getTagLength());
} else {
assertEquals(0, TestCoprocessorForTags.tags.size());
}
}
} finally {
if (scanner != null) {
scanner.close();
}
TestCoprocessorForTags.checkTagPresence = false;
}
} finally {
if (table != null) {
table.close();
}
// delete the table
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
}
}
@ -544,9 +552,6 @@ public class TestTags {
try {
scanner = table.getScanner(s);
Result next = scanner.next();
CellScanner cellScanner = next.cellScanner();
boolean advance = cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
assertTrue(Bytes.equals(next.getRow(), row));
assertTrue(Bytes.equals(next.getValue(fam, qual), value));
@ -630,7 +635,8 @@ public class TestTags {
CellScanner cellScanner = result.cellScanner();
if (cellScanner.advance()) {
Cell cell = cellScanner.current();
tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
}
}
}