HBASE-12374 Change DBEs to work with new BB based cell.

This commit is contained in:
anoopsjohn 2015-07-20 23:28:45 +05:30
parent 7e4cd59820
commit 0f614a1c44
21 changed files with 746 additions and 281 deletions

View File

@ -100,6 +100,12 @@ public final class CellUtil {
return output;
}
public static byte[] cloneTags(Cell cell) {
byte[] output = new byte[cell.getTagsLength()];
copyTagTo(cell, output, 0);
return output;
}
/**
* Returns tag value in a new byte array. If server-side, use
* {@link Tag#getBuffer()} with appropriate {@link Tag#getTagOffset()} and

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase;
import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
@ -495,6 +496,8 @@ public final class HConstants {
*/
public static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY);
/**
* Used by scanners, etc when they want to start at the beginning of a region
*/

View File

@ -0,0 +1,210 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
* This is a key only Cell implementation which is identical to {@link KeyValue.KeyOnlyKeyValue}
* with respect to key serialization but have its data in off heap memory.
*/
@InterfaceAudience.Private
public class OffheapKeyOnlyKeyValue extends ByteBufferedCell {
private ByteBuffer buf;
private int offset = 0; // offset into buffer where key starts at
private int length = 0; // length of this.
private short rowLen;
public OffheapKeyOnlyKeyValue(ByteBuffer buf, int offset, int length) {
assert buf.isDirect();
this.buf = buf;
this.offset = offset;
this.length = length;
this.rowLen = ByteBufferUtils.toShort(this.buf, this.offset);
}
@Override
public byte[] getRowArray() {
return CellUtil.cloneRow(this);
}
@Override
public int getRowOffset() {
return 0;
}
@Override
public short getRowLength() {
return this.rowLen;
}
@Override
public byte[] getFamilyArray() {
return CellUtil.cloneFamily(this);
}
@Override
public int getFamilyOffset() {
return 0;
}
@Override
public byte getFamilyLength() {
return getFamilyLength(getFamilyLengthPosition());
}
private byte getFamilyLength(int famLenPos) {
return ByteBufferUtils.toByte(this.buf, famLenPos);
}
@Override
public byte[] getQualifierArray() {
return CellUtil.cloneQualifier(this);
}
@Override
public int getQualifierOffset() {
return 0;
}
@Override
public int getQualifierLength() {
return getQualifierLength(getRowLength(), getFamilyLength());
}
private int getQualifierLength(int rlength, int flength) {
return this.length - (int) KeyValue.getKeyDataStructureSize(rlength, flength, 0);
}
@Override
public long getTimestamp() {
return ByteBufferUtils.toLong(this.buf, getTimestampOffset());
}
private int getTimestampOffset() {
return this.offset + this.length - KeyValue.TIMESTAMP_TYPE_SIZE;
}
@Override
public byte getTypeByte() {
return ByteBufferUtils.toByte(this.buf, this.offset + this.length - 1);
}
@Override
public long getSequenceId() {
return 0;
}
@Override
public byte[] getValueArray() {
throw new IllegalArgumentException("This is a key only Cell");
}
@Override
public int getValueOffset() {
return 0;
}
@Override
public int getValueLength() {
return 0;
}
@Override
public byte[] getTagsArray() {
throw new IllegalArgumentException("This is a key only Cell");
}
@Override
public int getTagsOffset() {
return 0;
}
@Override
public int getTagsLength() {
return 0;
}
@Override
public ByteBuffer getRowByteBuffer() {
return this.buf;
}
@Override
public int getRowPositionInByteBuffer() {
return this.offset + Bytes.SIZEOF_SHORT;
}
@Override
public ByteBuffer getFamilyByteBuffer() {
return this.buf;
}
@Override
public int getFamilyPositionInByteBuffer() {
return getFamilyLengthPosition() + Bytes.SIZEOF_BYTE;
}
// The position in BB where the family length is added.
private int getFamilyLengthPosition() {
return this.offset + Bytes.SIZEOF_SHORT + getRowLength();
}
@Override
public ByteBuffer getQualifierByteBuffer() {
return this.buf;
}
@Override
public int getQualifierPositionInByteBuffer() {
int famLenPos = getFamilyLengthPosition();
return famLenPos + Bytes.SIZEOF_BYTE + getFamilyLength(famLenPos);
}
@Override
public ByteBuffer getValueByteBuffer() {
throw new IllegalArgumentException("This is a key only Cell");
}
@Override
public int getValuePositionInByteBuffer() {
return 0;
}
@Override
public ByteBuffer getTagsByteBuffer() {
throw new IllegalArgumentException("This is a key only Cell");
}
@Override
public int getTagsPositionInByteBuffer() {
return 0;
}
@Override
public String toString() {
return CellUtil.toString(this, false);
}
}

View File

@ -127,6 +127,11 @@ public class ByteBufferOutputStream extends OutputStream {
ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
}
public void write(ByteBuffer b, int off, int len) throws IOException {
checkSizeAndGrow(len);
ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
}
/**
* Writes an <code>int</code> to the underlying output stream as four
* bytes, high byte first.

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
@ -133,7 +134,7 @@ public class TagCompressionContext {
* @return bytes count read from source to uncompress all tags.
* @throws IOException
*/
public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length)
throws IOException {
int srcBeginPos = src.position();
int endOffset = offset + length;

View File

@ -22,10 +22,13 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.ByteBufferedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.OffheapKeyOnlyKeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SettableSequenceId;
@ -38,9 +41,11 @@ import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
/**
@ -102,8 +107,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
- qualCommonPrefix);
}
protected static class SeekerState implements Cell {
protected ByteBuffer currentBuffer;
protected static class SeekerState {
protected ByteBuff currentBuffer;
protected TagCompressionContext tagCompressionContext;
protected int valueOffset = -1;
protected int keyLength;
@ -121,6 +126,15 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
protected long memstoreTS;
protected int nextKvOffset;
protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
// A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
// many object creations.
private final Pair<ByteBuffer, Integer> tmpPair;
private final boolean includeTags;
public SeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean includeTags) {
this.tmpPair = tmpPair;
this.includeTags = includeTags;
}
protected boolean isValid() {
return valueOffset != -1;
@ -200,114 +214,69 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
}
@Override
public byte[] getRowArray() {
return currentKey.getRowArray();
}
@Override
public int getRowOffset() {
return Bytes.SIZEOF_SHORT;
}
@Override
public short getRowLength() {
return currentKey.getRowLength();
}
@Override
public byte[] getFamilyArray() {
return currentKey.getFamilyArray();
}
@Override
public int getFamilyOffset() {
return currentKey.getFamilyOffset();
}
@Override
public byte getFamilyLength() {
return currentKey.getFamilyLength();
}
@Override
public byte[] getQualifierArray() {
return currentKey.getQualifierArray();
}
@Override
public int getQualifierOffset() {
return currentKey.getQualifierOffset();
}
@Override
public int getQualifierLength() {
return currentKey.getQualifierLength();
}
@Override
public long getTimestamp() {
return currentKey.getTimestamp();
}
@Override
public byte getTypeByte() {
return currentKey.getTypeByte();
}
@Override
public long getSequenceId() {
return memstoreTS;
}
@Override
public byte[] getValueArray() {
return currentBuffer.array();
}
@Override
public int getValueOffset() {
return currentBuffer.arrayOffset() + valueOffset;
}
@Override
public int getValueLength() {
return valueLength;
}
@Override
public byte[] getTagsArray() {
if (tagCompressionContext != null) {
return tagsBuffer;
public Cell toCell() {
// Buffer backing the value and tags part from the HFileBlock's buffer
// When tag compression in use, this will be only the value bytes area.
ByteBuffer valAndTagsBuffer;
int vOffset;
int valAndTagsLength = this.valueLength;
int tagsLenSerializationSize = 0;
if (this.includeTags && this.tagCompressionContext == null) {
// Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
// length
tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
}
return currentBuffer.array();
}
@Override
public int getTagsOffset() {
if (tagCompressionContext != null) {
return 0;
this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
valAndTagsBuffer = this.tmpPair.getFirst();
vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
if (valAndTagsBuffer.hasArray()) {
return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
} else {
return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
}
return currentBuffer.arrayOffset() + tagsOffset;
}
@Override
public int getTagsLength() {
return tagsLength;
}
@Override
public String toString() {
return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
+ getValueLength() + "/seqid=" + memstoreTS;
}
public Cell shallowCopy() {
return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength,
private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
int tagsLenSerializationSize) {
byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
int tOffset = 0;
if (this.includeTags) {
if (this.tagCompressionContext == null) {
tagsArray = valAndTagsBuffer.array();
tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
+ tagsLenSerializationSize;
} else {
tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
tOffset = 0;
}
}
return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
tOffset, this.tagsLength);
}
private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
int tagsLenSerializationSize) {
ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
int tOffset = 0;
if (this.includeTags) {
if (this.tagCompressionContext == null) {
tagsBuf = valAndTagsBuffer;
tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
} else {
tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
tOffset = 0;
}
}
return new OffheapDecodedCell(ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)),
currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
}
}
@ -318,16 +287,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
* represented by the valueOffset and valueLength
*/
// We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
// there. So this has to be an instance of SettableSequenceId. SeekerState need not be
// SettableSequenceId as we never return that to top layers. When we have to, we make
// ClonedSeekerState from it.
protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId,
// there. So this has to be an instance of SettableSequenceId.
protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId,
Streamable {
private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
+ (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
private byte[] keyOnlyBuffer;
private ByteBuffer currentBuffer;
private short rowLength;
private int familyOffset;
private byte familyLength;
@ -335,22 +301,19 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
private int qualifierLength;
private long timestamp;
private byte typeByte;
private byte[] valueBuffer;
private int valueOffset;
private int valueLength;
private int tagsLength;
private byte[] tagsBuffer;
private int tagsOffset;
private byte[] cloneTagsBuffer;
private int tagsLength;
private long seqId;
private TagCompressionContext tagCompressionContext;
protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
byte[] tagsBuffer) {
this.currentBuffer = currentBuffer;
keyOnlyBuffer = new byte[keyLength];
this.tagCompressionContext = tagCompressionContext;
protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
int tagsOffset, int tagsLength) {
this.keyOnlyBuffer = keyBuffer;
this.rowLength = rowLength;
this.familyOffset = familyOffset;
this.familyLength = familyLength;
@ -358,15 +321,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
this.qualifierLength = qualLength;
this.timestamp = timeStamp;
this.typeByte = typeByte;
this.valueLength = valueLen;
this.valueBuffer = valueBuffer;
this.valueOffset = valueOffset;
this.valueLength = valueLen;
this.tagsBuffer = tagsBuffer;
this.tagsOffset = tagsOffset;
this.tagsLength = tagsLength;
System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
if (tagCompressionContext != null) {
this.cloneTagsBuffer = new byte[tagsLength];
System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
}
setSequenceId(seqId);
}
@ -432,12 +392,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
@Override
public byte[] getValueArray() {
return currentBuffer.array();
return this.valueBuffer;
}
@Override
public int getValueOffset() {
return currentBuffer.arrayOffset() + valueOffset;
return valueOffset;
}
@Override
@ -447,18 +407,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
@Override
public byte[] getTagsArray() {
if (tagCompressionContext != null) {
return cloneTagsBuffer;
}
return currentBuffer.array();
return this.tagsBuffer;
}
@Override
public int getTagsOffset() {
if (tagCompressionContext != null) {
return 0;
}
return currentBuffer.arrayOffset() + tagsOffset;
return this.tagsOffset;
}
@Override
@ -497,21 +451,237 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
// Write key
out.write(keyOnlyBuffer);
// Write value
assert this.currentBuffer.hasArray();
out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.valueOffset,
this.valueLength);
out.write(this.valueBuffer, this.valueOffset, this.valueLength);
if (withTags) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength));
if (this.tagCompressionContext != null) {
out.write(cloneTagsBuffer);
} else {
out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.tagsOffset,
this.tagsLength);
}
out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite + Bytes.SIZEOF_INT;
}
}
protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize,
SettableSequenceId, Streamable {
private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
private ByteBuffer keyBuffer;
private short rowLength;
private int familyOffset;
private byte familyLength;
private int qualifierOffset;
private int qualifierLength;
private long timestamp;
private byte typeByte;
private ByteBuffer valueBuffer;
private int valueOffset;
private int valueLength;
private ByteBuffer tagsBuffer;
private int tagsOffset;
private int tagsLength;
private long seqId;
protected OffheapDecodedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
int tagsOffset, int tagsLength) {
// The keyBuffer is always onheap
assert keyBuffer.hasArray();
assert keyBuffer.arrayOffset() == 0;
this.keyBuffer = keyBuffer;
this.rowLength = rowLength;
this.familyOffset = familyOffset;
this.familyLength = familyLength;
this.qualifierOffset = qualOffset;
this.qualifierLength = qualLength;
this.timestamp = timeStamp;
this.typeByte = typeByte;
this.valueBuffer = valueBuffer;
this.valueOffset = valueOffset;
this.valueLength = valueLen;
this.tagsBuffer = tagsBuffer;
this.tagsOffset = tagsOffset;
this.tagsLength = tagsLength;
setSequenceId(seqId);
}
@Override
public byte[] getRowArray() {
return this.keyBuffer.array();
}
@Override
public int getRowOffset() {
return getRowPositionInByteBuffer();
}
@Override
public short getRowLength() {
return this.rowLength;
}
@Override
public byte[] getFamilyArray() {
return this.keyBuffer.array();
}
@Override
public int getFamilyOffset() {
return getFamilyPositionInByteBuffer();
}
@Override
public byte getFamilyLength() {
return this.familyLength;
}
@Override
public byte[] getQualifierArray() {
return this.keyBuffer.array();
}
@Override
public int getQualifierOffset() {
return getQualifierPositionInByteBuffer();
}
@Override
public int getQualifierLength() {
return this.qualifierLength;
}
@Override
public long getTimestamp() {
return this.timestamp;
}
@Override
public byte getTypeByte() {
return this.typeByte;
}
@Override
public long getSequenceId() {
return this.seqId;
}
@Override
public byte[] getValueArray() {
return CellUtil.cloneValue(this);
}
@Override
public int getValueOffset() {
return 0;
}
@Override
public int getValueLength() {
return this.valueLength;
}
@Override
public byte[] getTagsArray() {
return CellUtil.cloneTags(this);
}
@Override
public int getTagsOffset() {
return 0;
}
@Override
public int getTagsLength() {
return this.tagsLength;
}
@Override
public ByteBuffer getRowByteBuffer() {
return this.keyBuffer;
}
@Override
public int getRowPositionInByteBuffer() {
return Bytes.SIZEOF_SHORT;
}
@Override
public ByteBuffer getFamilyByteBuffer() {
return this.keyBuffer;
}
@Override
public int getFamilyPositionInByteBuffer() {
return this.familyOffset;
}
@Override
public ByteBuffer getQualifierByteBuffer() {
return this.keyBuffer;
}
@Override
public int getQualifierPositionInByteBuffer() {
return this.qualifierOffset;
}
@Override
public ByteBuffer getValueByteBuffer() {
return this.valueBuffer;
}
@Override
public int getValuePositionInByteBuffer() {
return this.valueOffset;
}
@Override
public ByteBuffer getTagsByteBuffer() {
return this.tagsBuffer;
}
@Override
public int getTagsPositionInByteBuffer() {
return this.tagsOffset;
}
@Override
public long heapSize() {
return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
}
@Override
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
@Override
public int write(OutputStream out) throws IOException {
return write(out, true);
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags);
writeInt(out, lenToWrite);
writeInt(out, keyBuffer.capacity());
writeInt(out, valueLength);
// Write key
out.write(keyBuffer.array());
// Write value
writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength);
if (withTags) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength));
writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite + Bytes.SIZEOF_INT;
}
@ -527,16 +697,30 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
}
private static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length)
throws IOException {
// We have write which takes ByteBuffer in ByteBufferOutputStream so that it can directly write
// bytes from the src ByteBuffer to the destination ByteBuffer. This avoid need for temp array
// creation and copy
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).write(b, offset, length);
} else {
ByteBufferUtils.copyBufferToStream(out, b, offset, length);
}
}
protected abstract static class
BufferedEncodedSeeker<STATE extends SeekerState>
implements EncodedSeeker {
protected HFileBlockDecodingContext decodingCtx;
protected final CellComparator comparator;
protected ByteBuffer currentBuffer;
protected STATE current = createSeekerState(); // always valid
protected STATE previous = createSeekerState(); // may not be valid
protected ByteBuff currentBuffer;
protected TagCompressionContext tagCompressionContext = null;
protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
// A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
// many object creations.
protected final Pair<ByteBuffer, Integer> tmpPair = new Pair<ByteBuffer, Integer>();
protected STATE current, previous;
public BufferedEncodedSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
@ -549,6 +733,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
throw new RuntimeException("Failed to initialize TagCompressionContext", e);
}
}
current = createSeekerState(); // always valid
previous = createSeekerState(); // may not be valid
}
protected boolean includesMvcc() {
@ -566,7 +752,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
@Override
public void setCurrentBuffer(ByteBuffer buffer) {
public void setCurrentBuffer(ByteBuff buffer) {
if (this.tagCompressionContext != null) {
this.tagCompressionContext.clear();
}
@ -589,9 +775,10 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
@Override
public ByteBuffer getValueShallowCopy() {
ByteBuffer dup = currentBuffer.duplicate();
dup.position(current.valueOffset);
dup.limit(current.valueOffset + current.valueLength);
currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
ByteBuffer dup = tmpPair.getFirst().duplicate();
dup.position(tmpPair.getSecond());
dup.limit(tmpPair.getSecond() + current.valueLength);
return dup.slice();
}
@ -601,8 +788,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
kvBuffer.putInt(current.keyLength);
kvBuffer.putInt(current.valueLength);
kvBuffer.put(current.keyBuffer, 0, current.keyLength);
ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.valueOffset,
current.valueLength);
currentBuffer.get(kvBuffer, current.valueOffset, current.valueLength);
if (current.tagsLength > 0) {
// Put short as unsigned
kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
@ -610,8 +796,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
if (current.tagsOffset != -1) {
// the offset of the tags bytes in the underlying buffer is marked. So the temp
// buffer,tagsBuffer was not been used.
ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.tagsOffset,
current.tagsLength);
currentBuffer.get(kvBuffer, current.tagsOffset, current.tagsLength);
} else {
// When tagsOffset is marked as -1, tag compression was present and so the tags were
// uncompressed into temp buffer, tagsBuffer. Let us copy it from there
@ -630,8 +815,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
@Override
public Cell getKeyValue() {
return current.shallowCopy();
public Cell getCell() {
return current.toCell();
}
@Override
@ -657,7 +842,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
protected void decodeTags() {
current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
if (tagCompressionContext != null) {
if (current.uncompressTags) {
// Tag compression is been used. uncompress it into tagsBuffer
@ -669,7 +854,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
throw new RuntimeException("Exception while uncompressing tags", e);
}
} else {
ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
currentBuffer.skip(current.tagsCompressedLength);
current.uncompressTags = true;// Reset this.
}
current.tagsOffset = -1;
@ -677,7 +862,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
// When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
// Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
current.tagsOffset = currentBuffer.position();
ByteBufferUtils.skip(currentBuffer, current.tagsLength);
currentBuffer.skip(current.tagsLength);
}
}
@ -844,7 +1029,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
protected STATE createSeekerState() {
// This will fail for non-default seeker state if the subclass does not
// override this method.
return (STATE) new SeekerState();
return (STATE) new SeekerState(this.tmpPair, this.includesTags());
}
abstract protected void decodeFirst();
@ -1017,4 +1202,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
encodingCtx.postEncoding(BlockType.DATA);
}
}
protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
if (key.hasArray()) {
return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(),
keyLength);
} else {
return new OffheapKeyOnlyKeyValue(key, key.position(), keyLength);
}
}
}

View File

@ -71,8 +71,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
int keyLength = block.getIntStrictlyForward(Bytes.SIZEOF_INT);
int pos = 3 * Bytes.SIZEOF_INT;
ByteBuffer key = block.asSubByteBuffer(pos + keyLength).duplicate();
// TODO : to be changed here for BBCell
return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, keyLength);
return createFirstKeyCell(key, keyLength);
}
@Override
@ -91,14 +90,14 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
current.ensureSpaceForKey();
currentBuffer.get(current.keyBuffer, 0, current.keyLength);
current.valueOffset = currentBuffer.position();
ByteBufferUtils.skip(currentBuffer, current.valueLength);
currentBuffer.skip(current.valueLength);
if (includesTags()) {
// Read short as unsigned, high byte first
current.tagsLength = ((currentBuffer.get() & 0xff) << 8) ^ (currentBuffer.get() & 0xff);
ByteBufferUtils.skip(currentBuffer, current.tagsLength);
currentBuffer.skip(current.tagsLength);
}
if (includesMvcc()) {
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
@ -107,7 +106,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
@Override
protected void decodeFirst() {
ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
currentBuffer.skip(Bytes.SIZEOF_INT);
current.lastCommonPrefix = 0;
decodeNext();
}

View File

@ -131,14 +131,14 @@ public interface DataBlockEncoder {
* An interface which enable to seek while underlying data is encoded.
*
* It works on one HFileBlock, but it is reusable. See
* {@link #setCurrentBuffer(ByteBuffer)}.
* {@link #setCurrentBuffer(ByteBuff)}.
*/
interface EncodedSeeker {
/**
* Set on which buffer there will be done seeking.
* @param buffer Used for seeking.
*/
void setCurrentBuffer(ByteBuffer buffer);
void setCurrentBuffer(ByteBuff buffer);
/**
* From the current position creates a cell using the key part
@ -160,10 +160,9 @@ public interface DataBlockEncoder {
ByteBuffer getKeyValueBuffer();
/**
* @return the KeyValue object at the current position. Includes memstore
* timestamp.
* @return the Cell at the current position. Includes memstore timestamp.
*/
Cell getKeyValue();
Cell getCell();
/** Set position to beginning of given block */
void rewind();

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/**
* Compress using:
@ -362,9 +363,14 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
protected static class DiffSeekerState extends SeekerState {
private int rowLengthWithSize;
private long timestamp;
public DiffSeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean includeTags) {
super(tmpPair, includeTags);
}
@Override
protected void copyFromNext(SeekerState that) {
super.copyFromNext(that);
@ -389,14 +395,12 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
if (!isFirst) {
type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
}
current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
current.valueLength =
ByteBufferUtils.readCompressedInt(currentBuffer);
current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
}
current.lastCommonPrefix =
ByteBufferUtils.readCompressedInt(currentBuffer);
current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
current.ensureSpaceForKey();
@ -446,8 +450,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
int timestampFitInBytes = 1 +
((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
long timestampOrDiff =
ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes);
if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
timestampOrDiff = -timestampOrDiff;
}
@ -467,13 +470,13 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
current.valueOffset = currentBuffer.position();
ByteBufferUtils.skip(currentBuffer, current.valueLength);
currentBuffer.skip(current.valueLength);
if (includesTags()) {
decodeTags();
}
if (includesMvcc()) {
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
@ -482,7 +485,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected void decodeFirst() {
ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
currentBuffer.skip(Bytes.SIZEOF_INT);
// read column family
byte familyNameLength = currentBuffer.get();
@ -500,7 +503,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected DiffSeekerState createSeekerState() {
return new DiffSeekerState();
return new DiffSeekerState(this.tmpPair, this.includesTags());
}
};
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/**
* Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
@ -364,8 +365,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
ByteBuff.readCompressedInt(block); // commonLength
ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
block.reset();
// TODO : Change to BBCell.
return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
return createFirstKeyCell(key, keyLength);
}
@Override
@ -379,6 +379,10 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
private int rowLengthWithSize;
private int familyLengthWithSize;
public FastDiffSeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean includeTags) {
super(tmpPair, includeTags);
}
@Override
protected void copyFromNext(SeekerState that) {
super.copyFromNext(that);
@ -404,14 +408,12 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
current.prevTimestampAndType, 0,
current.prevTimestampAndType.length);
}
current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
current.valueLength =
ByteBufferUtils.readCompressedInt(currentBuffer);
current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
}
current.lastCommonPrefix =
ByteBufferUtils.readCompressedInt(currentBuffer);
current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
current.ensureSpaceForKey();
@ -491,14 +493,14 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
// handle value
if ((flag & FLAG_SAME_VALUE) == 0) {
current.valueOffset = currentBuffer.position();
ByteBufferUtils.skip(currentBuffer, current.valueLength);
currentBuffer.skip(current.valueLength);
}
if (includesTags()) {
decodeTags();
}
if (includesMvcc()) {
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
@ -507,7 +509,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected void decodeFirst() {
ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
currentBuffer.skip(Bytes.SIZEOF_INT);
decode(true);
}
@ -518,7 +520,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected FastDiffSeekerState createSeekerState() {
return new FastDiffSeekerState();
return new FastDiffSeekerState(this.tmpPair, this.includesTags());
}
};
}

View File

@ -186,8 +186,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
block.reset();
// TODO : Change to BBCell
return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
return createFirstKeyCell(key, keyLength);
}
@Override
@ -201,21 +200,20 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
@Override
protected void decodeNext() {
current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
current.lastCommonPrefix =
ByteBufferUtils.readCompressedInt(currentBuffer);
current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
current.keyLength += current.lastCommonPrefix;
current.ensureSpaceForKey();
currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
current.keyLength - current.lastCommonPrefix);
current.valueOffset = currentBuffer.position();
ByteBufferUtils.skip(currentBuffer, current.valueLength);
currentBuffer.skip(current.valueLength);
if (includesTags()) {
decodeTags();
}
if (includesMvcc()) {
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
@ -224,7 +222,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected void decodeFirst() {
ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
currentBuffer.skip(Bytes.SIZEOF_INT);
decodeNext();
}
};

View File

@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.io.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.base.Preconditions;
@ -84,7 +84,7 @@ public class StreamUtils {
return result;
}
public static int readRawVarint32(ByteBuffer input) throws IOException {
public static int readRawVarint32(ByteBuff input) throws IOException {
byte tmp = input.get();
if (tmp >= 0) {
return tmp;

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
/**
* An abstract class that abstracts out as to how the byte buffers are used,
@ -435,4 +436,23 @@ public abstract class ByteBuff {
}
return tmpLength;
}
/**
* Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
* {@link ByteBuff}.
*/
public static long readVLong(ByteBuff in) {
byte firstByte = in.get();
int len = WritableUtils.decodeVIntSize(firstByte);
if (len == 1) {
return firstByte;
}
long i = 0;
for (int idx = 0; idx < len-1; idx++) {
byte b = in.get();
i = i << 8;
i = i | (b & 0xFF);
}
return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -60,11 +61,11 @@ public class TestTagCompressionContext {
byte[] dest = new byte[tagsLength1];
ByteBuffer ob = ByteBuffer.wrap(baos.toByteArray());
context.uncompressTags(ob, dest, 0, tagsLength1);
context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1);
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
tagsLength1));
dest = new byte[tagsLength2];
context.uncompressTags(ob, dest, 0, tagsLength2);
context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2);
assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
tagsLength2));
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.codec.prefixtree.decode.PrefixTreeArraySearcher;
import org.apache.hadoop.hbase.codec.prefixtree.scanner.CellScannerPosition;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -55,8 +56,8 @@ public class PrefixTreeSeeker implements EncodedSeeker {
}
@Override
public void setCurrentBuffer(ByteBuffer fullBlockBuffer) {
block = fullBlockBuffer;
public void setCurrentBuffer(ByteBuff fullBlockBuffer) {
block = fullBlockBuffer.asSubByteBuffer(fullBlockBuffer.limit());
// TODO : change to Bytebuff
ptSearcher = DecoderFactory.checkOut(block, includeMvccVersion);
rewind();
@ -97,7 +98,7 @@ public class PrefixTreeSeeker implements EncodedSeeker {
* currently must do deep copy into new array
*/
@Override
public Cell getKeyValue() {
public Cell getCell() {
Cell cell = ptSearcher.current();
if (cell == null) {
return null;

View File

@ -1454,8 +1454,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
+ DataBlockEncoding.getNameFromId(dataBlockEncoderId));
}
ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
// TODO : Change the DBEs to work with ByteBuffs
seeker.setCurrentBuffer(encodedBuffer.asSubByteBuffer(encodedBuffer.limit()));
seeker.setCurrentBuffer(encodedBuffer);
blockFetches++;
// Reset the next indexed key
@ -1528,7 +1527,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
if (block == null) {
return null;
}
return seeker.getKeyValue();
return seeker.getCell();
}
@Override

View File

@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HRegionLocator;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -249,6 +248,19 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return Collections.unmodifiableList(configurations);
}
public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
List<Object[]> configurations = new ArrayList<Object[]>();
configurations.add(new Object[] { false, false, true });
configurations.add(new Object[] { false, false, false });
configurations.add(new Object[] { false, true, true });
configurations.add(new Object[] { false, true, false });
configurations.add(new Object[] { true, false, true });
configurations.add(new Object[] { true, false, false });
configurations.add(new Object[] { true, true, true });
configurations.add(new Object[] { true, true, false });
return Collections.unmodifiableList(configurations);
}
public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
bloomAndCompressionCombinations();

View File

@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.io.encoding;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -47,7 +50,7 @@ public class TestBufferedDataBlockEncoder {
@Test
public void testEnsureSpaceForKey() {
BufferedDataBlockEncoder.SeekerState state =
new BufferedDataBlockEncoder.SeekerState();
new BufferedDataBlockEncoder.SeekerState(new Pair<ByteBuffer, Integer>(), false);
for (int i = 1; i <= 65536; ++i) {
state.keyLength = i;
state.ensureSpaceForKey();

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -74,14 +74,18 @@ public class TestDataBlockEncoders {
private final boolean includesMemstoreTS;
private final boolean includesTags;
private final boolean useOffheapData;
@Parameters
public static Collection<Object[]> parameters() {
return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
return HBaseTestingUtility.memStoreTSTagsAndOffheapCombination();
}
public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag) {
public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag,
boolean useOffheapData) {
this.includesMemstoreTS = includesMemstoreTS;
this.includesTags = includesTag;
this.useOffheapData = useOffheapData;
}
private HFileBlockEncodingContext getEncodingContext(Compression.Algorithm algo,
@ -178,12 +182,15 @@ public class TestDataBlockEncoders {
List<DataBlockEncoder.EncodedSeeker> encodedSeekers =
new ArrayList<DataBlockEncoder.EncodedSeeker>();
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
// Off heap block data support not added for PREFIX_TREE DBE yet.
// TODO remove this once support is added. HBASE-12298
if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) continue;
DataBlockEncoder encoder = encoding.getEncoder();
if (encoder == null) {
continue;
}
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
getEncodingContext(Compression.Algorithm.NONE, encoding));
getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS)
@ -192,7 +199,7 @@ public class TestDataBlockEncoders {
.build();
DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR,
encoder.newDataBlockDecodingContext(meta));
seeker.setCurrentBuffer(encodedBuffer);
seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
encodedSeekers.add(seeker);
}
// test it!
@ -222,7 +229,7 @@ public class TestDataBlockEncoders {
}
static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs,
HFileBlockEncodingContext encodingContext) throws IOException {
HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException {
DataBlockEncoder encoder = encoding.getEncoder();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
@ -236,6 +243,12 @@ public class TestDataBlockEncoders {
encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
if (useOffheapData) {
ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length);
bb.put(encodedData);
bb.rewind();
return bb;
}
return ByteBuffer.wrap(encodedData);
}
@ -244,12 +257,15 @@ public class TestDataBlockEncoders {
List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
// Off heap block data support not added for PREFIX_TREE DBE yet.
// TODO remove this once support is added. HBASE-12298
if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) continue;
if (encoding.getEncoder() == null) {
continue;
}
DataBlockEncoder encoder = encoding.getEncoder();
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
getEncodingContext(Compression.Algorithm.NONE, encoding));
getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS)
@ -258,31 +274,19 @@ public class TestDataBlockEncoders {
.build();
DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR,
encoder.newDataBlockDecodingContext(meta));
seeker.setCurrentBuffer(encodedBuffer);
seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
int i = 0;
do {
KeyValue expectedKeyValue = sampleKv.get(i);
ByteBuffer keyValue = seeker.getKeyValueBuffer();
if (0 != Bytes.compareTo(keyValue.array(), keyValue.arrayOffset(), keyValue.limit(),
expectedKeyValue.getBuffer(), expectedKeyValue.getOffset(),
expectedKeyValue.getLength())) {
int commonPrefix = 0;
byte[] left = keyValue.array();
byte[] right = expectedKeyValue.getBuffer();
int leftOff = keyValue.arrayOffset();
int rightOff = expectedKeyValue.getOffset();
int length = Math.min(keyValue.limit(), expectedKeyValue.getLength());
while (commonPrefix < length
&& left[commonPrefix + leftOff] == right[commonPrefix + rightOff]) {
commonPrefix++;
}
Cell cell = seeker.getCell();
if (CellComparator.COMPARATOR.compareKeyIgnoresMvcc(expectedKeyValue, cell) != 0) {
int commonPrefix = CellUtil
.findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true);
fail(String.format("next() produces wrong results "
+ "encoder: %s i: %d commonPrefix: %d" + "\n expected %s\n actual %s", encoder
.toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(),
expectedKeyValue.getOffset(), expectedKeyValue.getLength()), Bytes
.toStringBinary(keyValue)));
expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()), CellUtil.toString(
cell, false)));
}
i++;
} while (seeker.next());
@ -298,33 +302,19 @@ public class TestDataBlockEncoders {
List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
// Off heap block data support not added for PREFIX_TREE DBE yet.
// TODO remove this once support is added. HBASE-12298
if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) continue;
if (encoding.getEncoder() == null) {
continue;
}
DataBlockEncoder encoder = encoding.getEncoder();
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
getEncodingContext(Compression.Algorithm.NONE, encoding));
Cell key = encoder.getFirstKeyCellInBlock(new MultiByteBuff(
encodedBuffer));
KeyValue keyBuffer = null;
if(encoding == DataBlockEncoding.PREFIX_TREE) {
// This is not an actual case. So the Prefix tree block is not loaded in case of Prefix_tree
// Just copy only the key part to form a keyBuffer
byte[] serializedKey = CellUtil.getCellKeySerializedAsKeyValueKey(key);
keyBuffer = KeyValueUtil.createKeyValueFromKey(serializedKey);
} else {
keyBuffer = KeyValueUtil.ensureKeyValue(key);
}
getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData);
Cell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer));
KeyValue firstKv = sampleKv.get(0);
if (0 != CellComparator.COMPARATOR.compareKeyIgnoresMvcc(keyBuffer, firstKv)) {
int commonPrefix = 0;
int length = Math.min(keyBuffer.getKeyLength(), firstKv.getKeyLength());
while (commonPrefix < length
&& keyBuffer.getBuffer()[keyBuffer.getKeyOffset() + commonPrefix] == firstKv
.getBuffer()[firstKv.getKeyOffset() + commonPrefix]) {
commonPrefix++;
}
if (0 != CellComparator.COMPARATOR.compareKeyIgnoresMvcc(key, firstKv)) {
int commonPrefix = CellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true);
fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix));
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -117,27 +118,27 @@ public class TestPrefixTreeEncoding {
byte[] onDiskBytes = baosInMemory.toByteArray();
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
onDiskBytes.length - DataBlockEncoding.ID_SIZE);
seeker.setCurrentBuffer(readBuffer);
seeker.setCurrentBuffer(new SingleByteBuff(readBuffer));
// Seek before the first keyvalue;
Cell seekKey = CellUtil.createFirstDeleteFamilyCellOnRow(getRowKey(batchId, 0), CF_BYTES);
seeker.seekToKeyInBlock(seekKey, true);
assertEquals(null, seeker.getKeyValue());
assertEquals(null, seeker.getCell());
// Seek before the middle keyvalue;
seekKey = CellUtil.createFirstDeleteFamilyCellOnRow(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3),
CF_BYTES);
seeker.seekToKeyInBlock(seekKey, true);
assertNotNull(seeker.getKeyValue());
assertNotNull(seeker.getCell());
assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3 - 1),
CellUtil.cloneRow(seeker.getKeyValue()));
CellUtil.cloneRow(seeker.getCell()));
// Seek before the last keyvalue;
seekKey = CellUtil.createFirstDeleteFamilyCellOnRow(Bytes.toBytes("zzzz"), CF_BYTES);
seeker.seekToKeyInBlock(seekKey, true);
assertNotNull(seeker.getKeyValue());
assertNotNull(seeker.getCell());
assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH - 1),
CellUtil.cloneRow(seeker.getKeyValue()));
CellUtil.cloneRow(seeker.getCell()));
}
@Test
@ -160,10 +161,10 @@ public class TestPrefixTreeEncoding {
byte[] onDiskBytes = baosInMemory.toByteArray();
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
onDiskBytes.length - DataBlockEncoding.ID_SIZE);
seeker.setCurrentBuffer(readBuffer);
seeker.setCurrentBuffer(new SingleByteBuff(readBuffer));
Cell previousKV = null;
do {
Cell currentKV = seeker.getKeyValue();
Cell currentKV = seeker.getCell();
System.out.println(currentKV);
if (previousKV != null && CellComparator.COMPARATOR.compare(currentKV, previousKV) < 0) {
dumpInputKVSet();
@ -229,7 +230,7 @@ public class TestPrefixTreeEncoding {
List<KeyValue> kvList = new ArrayList<KeyValue>();
for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
kvList.clear();
encodeSeeker.setCurrentBuffer(encodedData);
encodeSeeker.setCurrentBuffer(new SingleByteBuff(encodedData));
KeyValue firstOnRow = KeyValueUtil.createFirstOnRow(getRowKey(batchId, i));
encodeSeeker.seekToKeyInBlock(
new KeyValue.KeyOnlyKeyValue(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
@ -243,11 +244,11 @@ public class TestPrefixTreeEncoding {
fail("Get error result after seeking " + firstOnRow);
}
if (hasMoreOfEncodeScanner) {
if (CellComparator.COMPARATOR.compare(encodeSeeker.getKeyValue(),
if (CellComparator.COMPARATOR.compare(encodeSeeker.getCell(),
collectionScanner.peek()) != 0) {
dumpInputKVSet();
fail("Expected " + collectionScanner.peek() + " actual "
+ encodeSeeker.getKeyValue() + ", after seeking " + firstOnRow);
+ encodeSeeker.getCell() + ", after seeking " + firstOnRow);
}
}
}

View File

@ -21,25 +21,43 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@Category({IOTests.class, SmallTests.class})
@RunWith(Parameterized.class)
public class TestSeekToBlockWithEncoders {
private final boolean useOffheapData;
@Parameters
public static Collection<Object[]> parameters() {
return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
}
public TestSeekToBlockWithEncoders(boolean useOffheapData) {
this.useOffheapData = useOffheapData;
}
/**
* Test seeking while file is encoded.
*/
@ -265,10 +283,10 @@ public class TestSeekToBlockWithEncoders {
HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(encoding,
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
ByteBuffer encodedBuffer = TestDataBlockEncoders.encodeKeyValues(encoding, kvs,
encodingContext);
encodingContext, this.useOffheapData);
DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR,
encoder.newDataBlockDecodingContext(meta));
seeker.setCurrentBuffer(encodedBuffer);
seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
encodedSeekers.add(seeker);
}
// test it!
@ -280,7 +298,7 @@ public class TestSeekToBlockWithEncoders {
Cell keyValue, KeyValue expected) {
for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
seeker.seekToKeyInBlock(keyValue, false);
Cell keyValue2 = seeker.getKeyValue();
Cell keyValue2 = seeker.getCell();
assertEquals(expected, keyValue2);
seeker.rewind();
}