HBASE-14063 Use BufferBackedCell in read path after HBASE-12213 and

HBASE-12295 (ram)
This commit is contained in:
ramkrishna 2015-07-27 16:55:46 +05:30
parent ac08b992c5
commit 3f80e0ea4f
14 changed files with 663 additions and 145 deletions

View File

@ -1,5 +1,4 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -27,19 +26,37 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* This is a key only Cell implementation which is identical to {@link KeyValue.KeyOnlyKeyValue}
* with respect to key serialization but have its data in off heap memory.
* with respect to key serialization but have its data in the form of Byte buffer
* (onheap and offheap).
*/
@InterfaceAudience.Private
public class OffheapKeyOnlyKeyValue extends ByteBufferedCell {
public class ByteBufferedKeyOnlyKeyValue extends ByteBufferedCell {
private ByteBuffer buf;
private int offset = 0; // offset into buffer where key starts at
private int length = 0; // length of this.
private short rowLen;
public OffheapKeyOnlyKeyValue(ByteBuffer buf, int offset, int length) {
assert buf.isDirect();
this.buf = buf;
/**
* Used in cases where we want to avoid lot of garbage by allocating new objects with different
* keys. Use the emtpy construtor and set the keys using {@link #setKey(ByteBuffer, int, int)}
*/
public ByteBufferedKeyOnlyKeyValue() {
}
public ByteBufferedKeyOnlyKeyValue(ByteBuffer buf, int offset, int length) {
setKey(buf, offset, length);
}
/**
* A setter that helps to avoid object creation every time and whenever
* there is a need to create new OffheapKeyOnlyKeyValue.
* @param key
* @param offset
* @param length
*/
public void setKey(ByteBuffer key, int offset, int length) {
this.buf = key;
this.offset = offset;
this.length = length;
this.rowLen = ByteBufferUtils.toShort(this.buf, this.offset);
@ -47,11 +64,17 @@ public class OffheapKeyOnlyKeyValue extends ByteBufferedCell {
@Override
public byte[] getRowArray() {
if (this.buf.hasArray()) {
return this.buf.array();
}
return CellUtil.cloneRow(this);
}
@Override
public int getRowOffset() {
if (this.buf.hasArray()) {
return getRowPositionInByteBuffer() + this.buf.arrayOffset();
}
return 0;
}
@ -62,11 +85,17 @@ public class OffheapKeyOnlyKeyValue extends ByteBufferedCell {
@Override
public byte[] getFamilyArray() {
if (this.buf.hasArray()) {
return this.buf.array();
}
return CellUtil.cloneFamily(this);
}
@Override
public int getFamilyOffset() {
if (this.buf.hasArray()) {
return getFamilyPositionInByteBuffer() + this.buf.arrayOffset();
}
return 0;
}
@ -81,11 +110,17 @@ public class OffheapKeyOnlyKeyValue extends ByteBufferedCell {
@Override
public byte[] getQualifierArray() {
if (this.buf.hasArray()) {
return this.buf.array();
}
return CellUtil.cloneQualifier(this);
}
@Override
public int getQualifierOffset() {
if (this.buf.hasArray()) {
return getQualifierPositionInByteBuffer() + this.buf.arrayOffset();
}
return 0;
}

View File

@ -37,9 +37,8 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.IOUtils;
@ -2475,22 +2474,11 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
if (!withTags) {
length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
}
writeInt(out, length);
ByteBufferUtils.putInt(out, length);
out.write(this.bytes, this.offset, length);
return length + Bytes.SIZEOF_INT;
}
// This does same as DataOuput#writeInt (big-endian, etc.)
public static void writeInt(OutputStream out, int v) throws IOException {
// We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying
// ByteBuffer in one step.
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).writeInt(v);
} else {
StreamUtils.writeInt(out, v);
}
}
/**
* Comparator that compares row component only of a KeyValue.
*/

View File

@ -589,11 +589,11 @@ public class KeyValueUtil {
int tlen = cell.getTagsLength();
// write total length
KeyValue.writeInt(out, length(rlen, flen, qlen, vlen, tlen, withTags));
ByteBufferUtils.putInt(out, length(rlen, flen, qlen, vlen, tlen, withTags));
// write key length
KeyValue.writeInt(out, keyLength(rlen, flen, qlen));
ByteBufferUtils.putInt(out, keyLength(rlen, flen, qlen));
// write value length
KeyValue.writeInt(out, vlen);
ByteBufferUtils.putInt(out, vlen);
// Write rowkey - 2 bytes rk length followed by rowkey bytes
StreamUtils.writeShort(out, rlen);
out.write(cell.getRowArray(), cell.getRowOffset(), rlen);

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -43,7 +44,7 @@ public class NoTagsKeyValue extends KeyValue {
public int write(OutputStream out, boolean withTags) throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
// check KeyValueUtil#oswrite also and do necessary changes.
writeInt(out, this.length);
ByteBufferUtils.putInt(out, this.length);
out.write(this.bytes, this.offset, this.length);
return this.length + Bytes.SIZEOF_INT;
}

View File

@ -0,0 +1,264 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/**
* This Cell is an implementation of {@link ByteBufferedCell} where the data resides in off heap
* memory.
*/
@InterfaceAudience.Private
public class OffheapKeyValue extends ByteBufferedCell implements HeapSize, Cloneable,
SettableSequenceId, Streamable {
protected final ByteBuffer buf;
protected final int offset;
protected final int length;
private final short rowLen;
private final int keyLen;
private long seqId = 0;
private final boolean hasTags;
// TODO : See if famLen can be cached or not?
private static final int FIXED_HEAP_SIZE_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
+ ClassSize.align(ClassSize.BYTE_BUFFER) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT
+ Bytes.SIZEOF_BOOLEAN + Bytes.SIZEOF_LONG;
public OffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags) {
assert buf.isDirect();
this.buf = buf;
this.offset = offset;
this.length = length;
rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET);
keyLen = ByteBufferUtils.toInt(this.buf, this.offset);
this.hasTags = hasTags;
}
@Override
public byte[] getRowArray() {
return CellUtil.cloneRow(this);
}
@Override
public int getRowOffset() {
return 0;
}
@Override
public short getRowLength() {
return this.rowLen;
}
@Override
public byte[] getFamilyArray() {
return CellUtil.cloneFamily(this);
}
@Override
public int getFamilyOffset() {
return 0;
}
@Override
public byte getFamilyLength() {
return getFamilyLength(getFamilyLengthPosition());
}
private int getFamilyLengthPosition() {
return this.offset + KeyValue.ROW_KEY_OFFSET + rowLen;
}
private byte getFamilyLength(int famLenPos) {
return ByteBufferUtils.toByte(this.buf, famLenPos);
}
@Override
public byte[] getQualifierArray() {
return CellUtil.cloneQualifier(this);
}
@Override
public int getQualifierOffset() {
return 0;
}
@Override
public int getQualifierLength() {
return getQualifierLength(getRowLength(), getFamilyLength());
}
private int getQualifierLength(int rlength, int flength) {
return this.keyLen - (int) KeyValue.getKeyDataStructureSize(rlength, flength, 0);
}
@Override
public long getTimestamp() {
int offset = getTimestampOffset(this.keyLen);
return ByteBufferUtils.toLong(this.buf, offset);
}
private int getTimestampOffset(int keyLen) {
return this.offset + KeyValue.ROW_OFFSET + keyLen - KeyValue.TIMESTAMP_TYPE_SIZE;
}
@Override
public byte getTypeByte() {
return ByteBufferUtils.toByte(this.buf, this.offset + this.keyLen - 1 + KeyValue.ROW_OFFSET);
}
@Override
public long getSequenceId() {
return this.seqId;
}
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
@Override
public byte[] getValueArray() {
return CellUtil.cloneValue(this);
}
@Override
public int getValueOffset() {
return 0;
}
@Override
public int getValueLength() {
return ByteBufferUtils.toInt(this.buf, this.offset + Bytes.SIZEOF_INT);
}
@Override
public byte[] getTagsArray() {
return CellUtil.cloneTags(this);
}
@Override
public int getTagsOffset() {
return 0;
}
@Override
public int getTagsLength() {
if(!hasTags) {
return 0;
}
int tagsLen = this.length
- (this.keyLen + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
if (tagsLen > 0) {
// There are some Tag bytes in the byte[]. So reduce 2 bytes which is
// added to denote the tags
// length
tagsLen -= KeyValue.TAGS_LENGTH_SIZE;
}
return tagsLen;
}
@Override
public ByteBuffer getRowByteBuffer() {
return this.buf;
}
@Override
public int getRowPositionInByteBuffer() {
return this.offset + KeyValue.ROW_KEY_OFFSET;
}
@Override
public ByteBuffer getFamilyByteBuffer() {
return this.buf;
}
@Override
public int getFamilyPositionInByteBuffer() {
return getFamilyLengthPosition() + Bytes.SIZEOF_BYTE;
}
@Override
public ByteBuffer getQualifierByteBuffer() {
return this.buf;
}
@Override
public int getQualifierPositionInByteBuffer() {
return getFamilyPositionInByteBuffer() + getFamilyLength();
}
@Override
public ByteBuffer getValueByteBuffer() {
return this.buf;
}
@Override
public int getValuePositionInByteBuffer() {
return this.offset + KeyValue.ROW_OFFSET + this.keyLen;
}
@Override
public ByteBuffer getTagsByteBuffer() {
return this.buf;
}
@Override
public int getTagsPositionInByteBuffer() {
int tagsLen = getTagsLength();
if (tagsLen == 0) {
return this.offset + this.length;
}
return this.offset + this.length - tagsLen;
}
@Override
public long heapSize() {
return ClassSize.align(FIXED_HEAP_SIZE_OVERHEAD + ClassSize.align(length));
}
@Override
public int write(OutputStream out) throws IOException {
return write(out, true);
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
// In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any
// changes doing here, pls check KeyValueUtil#oswrite also and do necessary changes.
int length = this.length;
if (hasTags && !withTags) {
length = keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
}
ByteBufferUtils.putInt(out, length);
ByteBufferUtils.writeByteBuffer(out, this.buf, this.offset, length);
return length + Bytes.SIZEOF_INT;
}
@Override
public String toString() {
return CellUtil.toString(this, true);
}
}

View File

@ -28,13 +28,12 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.OffheapKeyOnlyKeyValue;
import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SettableSequenceId;
import org.apache.hadoop.hbase.Streamable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.hfile.BlockType;
@ -445,9 +444,9 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags);
writeInt(out, lenToWrite);
writeInt(out, keyOnlyBuffer.length);
writeInt(out, valueLength);
ByteBufferUtils.putInt(out, lenToWrite);
ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
ByteBufferUtils.putInt(out, valueLength);
// Write key
out.write(keyOnlyBuffer);
// Write value
@ -668,47 +667,25 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags);
writeInt(out, lenToWrite);
writeInt(out, keyBuffer.capacity());
writeInt(out, valueLength);
ByteBufferUtils.putInt(out, lenToWrite);
ByteBufferUtils.putInt(out, keyBuffer.capacity());
ByteBufferUtils.putInt(out, valueLength);
// Write key
out.write(keyBuffer.array());
// Write value
writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength);
ByteBufferUtils.writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength);
if (withTags) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength));
writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
ByteBufferUtils.writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite + Bytes.SIZEOF_INT;
}
}
private static void writeInt(OutputStream out, int v) throws IOException {
// We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying
// ByteBuffer in one step.
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).writeInt(v);
} else {
StreamUtils.writeInt(out, v);
}
}
private static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length)
throws IOException {
// We have write which takes ByteBuffer in ByteBufferOutputStream so that it can directly write
// bytes from the src ByteBuffer to the destination ByteBuffer. This avoid need for temp array
// creation and copy
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).write(b, offset, length);
} else {
ByteBufferUtils.copyBufferToStream(out, b, offset, length);
}
}
protected abstract static class
BufferedEncodedSeeker<STATE extends SeekerState>
implements EncodedSeeker {
@ -1166,7 +1143,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
}
}
ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
}
@ -1208,7 +1185,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(),
keyLength);
} else {
return new OffheapKeyOnlyKeyValue(key, key.position(), keyLength);
return new ByteBufferedKeyOnlyKeyValue(key, key.position(), keyLength);
}
}
}

View File

@ -27,6 +27,8 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
@ -134,8 +136,13 @@ public final class ByteBufferUtils {
*/
public static void putInt(OutputStream out, final int value)
throws IOException {
for (int i = Bytes.SIZEOF_INT - 1; i >= 0; --i) {
out.write((byte) (value >>> (i * 8)));
// We have writeInt in ByteBufferOutputStream so that it can directly write
// int to underlying
// ByteBuffer in one step.
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).writeInt(value);
} else {
StreamUtils.writeInt(out, value);
}
}
@ -829,4 +836,41 @@ public final class ByteBufferUtils {
}
}
}
public static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length)
throws IOException {
// We have write which takes ByteBuffer in ByteBufferOutputStream so that it
// can directly write
// bytes from the src ByteBuffer to the destination ByteBuffer. This avoid
// need for temp array
// creation and copy
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).write(b, offset, length);
} else {
ByteBufferUtils.copyBufferToStream(out, b, offset, length);
}
}
// For testing purpose
public static String toStringBinary(final ByteBuffer b, int off, int len) {
StringBuilder result = new StringBuilder();
// Just in case we are passed a 'len' that is > buffer length...
if (off >= b.capacity())
return result.toString();
if (off + len > b.capacity())
len = b.capacity() - off;
for (int i = off; i < off + len; ++i) {
int ch = b.get(i) & 0xFF;
if ((ch >= '0' && ch <= '9') || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z')
|| " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
result.append((char) ch);
} else {
result.append(String.format("\\x%02X", ch));
}
}
return result.toString();
}
public static String toStringBinary(final ByteBuffer b) {
return toStringBinary(b, 0, b.capacity());
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MiscTests.class, SmallTests.class })
public class TestOffheapKeyValue {
private static final String QUAL2 = "qual2";
private static final String FAM2 = "fam2";
private static final String QUAL1 = "qual1";
private static final String FAM1 = "fam1";
private static final String ROW1 = "row1";
private static final byte[] row1 = Bytes.toBytes(ROW1);
private static final byte[] fam1 = Bytes.toBytes(FAM1);
private static final byte[] fam2 = Bytes.toBytes(FAM2);
private static final byte[] qual1 = Bytes.toBytes(QUAL1);
private static final byte[] qual2 = Bytes.toBytes(QUAL2);
private static final Tag t1 = new Tag((byte) 1, Bytes.toBytes("TAG1"));
private static final Tag t2 = new Tag((byte) 2, Bytes.toBytes("TAG2"));
private static final ArrayList<Tag> tags = new ArrayList<Tag>();
static {
tags.add(t1);
tags.add(t2);
}
@Test
public void testByteBufferBackedKeyValue() throws Exception {
KeyValue kvCell = new KeyValue(row1, fam1, qual1, 0l, Type.Put, row1);
ByteBuffer buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length);
ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length);
ByteBufferedCell offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false);
assertEquals(
ROW1,
ByteBufferUtils.toStringBinary(offheapKV.getRowByteBuffer(),
offheapKV.getRowPositionInByteBuffer(), offheapKV.getRowLength()));
assertEquals(
FAM1,
ByteBufferUtils.toStringBinary(offheapKV.getFamilyByteBuffer(),
offheapKV.getFamilyPositionInByteBuffer(), offheapKV.getFamilyLength()));
assertEquals(
QUAL1,
ByteBufferUtils.toStringBinary(offheapKV.getQualifierByteBuffer(),
offheapKV.getQualifierPositionInByteBuffer(), offheapKV.getQualifierLength()));
assertEquals(
ROW1,
ByteBufferUtils.toStringBinary(offheapKV.getValueByteBuffer(),
offheapKV.getValuePositionInByteBuffer(), offheapKV.getValueLength()));
assertEquals(0L, offheapKV.getTimestamp());
assertEquals(Type.Put.getCode(), offheapKV.getTypeByte());
// Use the array() APIs
assertEquals(
ROW1,
Bytes.toStringBinary(offheapKV.getRowArray(),
offheapKV.getRowOffset(), offheapKV.getRowLength()));
assertEquals(
FAM1,
Bytes.toStringBinary(offheapKV.getFamilyArray(),
offheapKV.getFamilyOffset(), offheapKV.getFamilyLength()));
assertEquals(
QUAL1,
Bytes.toStringBinary(offheapKV.getQualifierArray(),
offheapKV.getQualifierOffset(), offheapKV.getQualifierLength()));
assertEquals(
ROW1,
Bytes.toStringBinary(offheapKV.getValueArray(),
offheapKV.getValueOffset(), offheapKV.getValueLength()));
assertEquals(0L, offheapKV.getTimestamp());
assertEquals(Type.Put.getCode(), offheapKV.getTypeByte());
kvCell = new KeyValue(row1, fam2, qual2, 0l, Type.Put, row1);
buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length);
ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length);
offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false);
assertEquals(
FAM2,
ByteBufferUtils.toStringBinary(offheapKV.getFamilyByteBuffer(),
offheapKV.getFamilyPositionInByteBuffer(), offheapKV.getFamilyLength()));
assertEquals(
QUAL2,
ByteBufferUtils.toStringBinary(offheapKV.getQualifierByteBuffer(),
offheapKV.getQualifierPositionInByteBuffer(), offheapKV.getQualifierLength()));
byte[] nullQualifier = new byte[0];
kvCell = new KeyValue(row1, fam1, nullQualifier, 0L, Type.Put, row1);
buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length);
ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length);
offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false);
assertEquals(
ROW1,
ByteBufferUtils.toStringBinary(offheapKV.getRowByteBuffer(),
offheapKV.getRowPositionInByteBuffer(), offheapKV.getRowLength()));
assertEquals(
FAM1,
ByteBufferUtils.toStringBinary(offheapKV.getFamilyByteBuffer(),
offheapKV.getFamilyPositionInByteBuffer(), offheapKV.getFamilyLength()));
assertEquals(
"",
ByteBufferUtils.toStringBinary(offheapKV.getQualifierByteBuffer(),
offheapKV.getQualifierPositionInByteBuffer(), offheapKV.getQualifierLength()));
assertEquals(
ROW1,
ByteBufferUtils.toStringBinary(offheapKV.getValueByteBuffer(),
offheapKV.getValuePositionInByteBuffer(), offheapKV.getValueLength()));
assertEquals(0L, offheapKV.getTimestamp());
assertEquals(Type.Put.getCode(), offheapKV.getTypeByte());
}
@Test
public void testByteBufferBackedKeyValueWithTags() throws Exception {
KeyValue kvCell = new KeyValue(row1, fam1, qual1, 0l, Type.Put, row1, tags);
ByteBuffer buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length);
ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length);
ByteBufferedCell offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), true);
assertEquals(
ROW1,
ByteBufferUtils.toStringBinary(offheapKV.getRowByteBuffer(),
offheapKV.getRowPositionInByteBuffer(), offheapKV.getRowLength()));
assertEquals(
FAM1,
ByteBufferUtils.toStringBinary(offheapKV.getFamilyByteBuffer(),
offheapKV.getFamilyPositionInByteBuffer(), offheapKV.getFamilyLength()));
assertEquals(
QUAL1,
ByteBufferUtils.toStringBinary(offheapKV.getQualifierByteBuffer(),
offheapKV.getQualifierPositionInByteBuffer(), offheapKV.getQualifierLength()));
assertEquals(
ROW1,
ByteBufferUtils.toStringBinary(offheapKV.getValueByteBuffer(),
offheapKV.getValuePositionInByteBuffer(), offheapKV.getValueLength()));
assertEquals(0L, offheapKV.getTimestamp());
assertEquals(Type.Put.getCode(), offheapKV.getTypeByte());
// change tags to handle both onheap and offheap stuff
List<Tag> resTags =
Tag.asList(offheapKV.getTagsArray(), offheapKV.getTagsOffset(), offheapKV.getTagsLength());
Tag tag1 = resTags.get(0);
assertEquals(t1.getType(), tag1.getType());
assertEquals(Bytes.toString(t1.getValue()), Bytes.toString(getTagValue(tag1)));
Tag tag2 = resTags.get(1);
assertEquals(tag2.getType(), tag2.getType());
assertEquals(Bytes.toString(t2.getValue()), Bytes.toString(getTagValue(tag2)));
Tag res = Tag.getTag(offheapKV.getTagsArray(), 0, offheapKV.getTagsLength(), (byte) 2);
assertEquals(Bytes.toString(t2.getValue()), Bytes.toString(getTagValue(tag2)));
res = Tag.getTag(offheapKV.getTagsArray(), 0, offheapKV.getTagsLength(), (byte) 3);
assertNull(res);
}
@Test
public void testGetKeyMethods() throws Exception {
KeyValue kvCell = new KeyValue(row1, fam1, qual1, 0l, Type.Put, row1, tags);
ByteBuffer buf = ByteBuffer.allocateDirect(kvCell.getKeyLength());
ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), kvCell.getKeyOffset(),
kvCell.getKeyLength());
ByteBufferedCell offheapKeyOnlyKV = new ByteBufferedKeyOnlyKeyValue(buf, 0, buf.capacity());
assertEquals(
ROW1,
ByteBufferUtils.toStringBinary(offheapKeyOnlyKV.getRowByteBuffer(),
offheapKeyOnlyKV.getRowPositionInByteBuffer(), offheapKeyOnlyKV.getRowLength()));
assertEquals(
FAM1,
ByteBufferUtils.toStringBinary(offheapKeyOnlyKV.getFamilyByteBuffer(),
offheapKeyOnlyKV.getFamilyPositionInByteBuffer(), offheapKeyOnlyKV.getFamilyLength()));
assertEquals(
QUAL1,
ByteBufferUtils.toStringBinary(offheapKeyOnlyKV.getQualifierByteBuffer(),
offheapKeyOnlyKV.getQualifierPositionInByteBuffer(),
offheapKeyOnlyKV.getQualifierLength()));
assertEquals(0L, offheapKeyOnlyKV.getTimestamp());
assertEquals(Type.Put.getCode(), offheapKeyOnlyKV.getTypeByte());
}
// TODO : Can be moved to TagUtil
private static byte[] getTagValue(Tag tag) {
int tagLength = tag.getTagLength();
byte[] tagBytes = new byte[tagLength];
System.arraycopy(tag.getBuffer(), tag.getTagOffset(), tagBytes, 0, tagLength);
return tagBytes;
}
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -44,7 +45,7 @@ public class SizeCachedNoTagsKeyValue extends SizeCachedKeyValue {
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
writeInt(out, this.length);
ByteBufferUtils.putInt(out, this.length);
out.write(this.bytes, this.offset, this.length);
return this.length + Bytes.SIZEOF_INT;
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@ -728,7 +729,7 @@ public class HFileBlockIndex {
// If we imagine that keys[-1] = -Infinity and
// keys[numEntries] = Infinity, then we are maintaining an invariant that
// keys[low - 1] < key < keys[high + 1] while narrowing down the range.
KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
ByteBufferedKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferedKeyOnlyKeyValue();
Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>();
while (low <= high) {
mid = (low + high) >>> 1;
@ -753,9 +754,8 @@ public class HFileBlockIndex {
// done after HBASE-12224 & HBASE-12282
// TODO avoid array call.
nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
nonRootIndexKV.setKey(pair.getFirst().array(),
pair.getFirst().arrayOffset() + pair.getSecond(), midLength);
int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexKV);
nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexkeyOnlyKV);
// key lives above the midpoint
if (cmp > 0)

View File

@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
import org.apache.hadoop.hbase.OffheapKeyValue;
import org.apache.hadoop.hbase.ShareableMemory;
import org.apache.hadoop.hbase.SizeCachedKeyValue;
import org.apache.hadoop.hbase.HConstants;
@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.Pair;
@ -464,7 +467,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
protected volatile int blockFetches;
protected final HFile.Reader reader;
private int currTagsLen;
private KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
// buffer backed keyonlyKV
private ByteBufferedKeyOnlyKeyValue bufBackedKeyOnlyKv = new ByteBufferedKeyOnlyKeyValue();
// A pair for reusing in blockSeek() so that we don't garbage lot of objects
final Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>();
@ -675,10 +679,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
offsetFromPos += Bytes.SIZEOF_LONG;
blockBuffer.asSubByteBuffer(blockBuffer.position() + offsetFromPos, klen, pair);
// TODO :change here after Bufferbackedcells come
keyOnlyKv.setKey(pair.getFirst().array(), pair.getFirst().arrayOffset() + pair.getSecond(),
klen);
int comp = reader.getComparator().compareKeyIgnoresMvcc(key, keyOnlyKv);
bufBackedKeyOnlyKv.setKey(pair.getFirst(), pair.getSecond(), klen);
int comp = reader.getComparator().compareKeyIgnoresMvcc(key, bufBackedKeyOnlyKv);
offsetFromPos += klen + vlen;
if (this.reader.getFileContext().isIncludesTags()) {
// Read short as unsigned, high byte first
@ -888,30 +890,43 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
if (!isSeeked())
return null;
KeyValue ret;
// TODO : reduce the varieties of KV here. Check if based on a boolean
// we can handle the 'no tags' case
// TODO : Handle MBB here
if (currTagsLen > 0) {
if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
ret = new ShareableMemoryKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
Cell ret;
int cellBufSize = getCellBufSize();
if (blockBuffer.hasArray()) {
// TODO : reduce the varieties of KV here. Check if based on a boolean
// we can handle the 'no tags' case.
if (currTagsLen > 0) {
if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
ret = new ShareableMemoryKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
} else {
ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), cellBufSize);
}
} else {
ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
ret = new ShareableMemoryNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
} else {
ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), cellBufSize);
}
}
} else {
ByteBuffer buf = blockBuffer.asSubByteBuffer(cellBufSize);
if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
ret = new ShareableMemoryNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
ret = new ShareableMemoryOffheapKeyValue(buf, buf.position(), cellBufSize,
currTagsLen > 0);
} else {
ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
ret = new OffheapKeyValue(buf, buf.position(), cellBufSize, currTagsLen > 0);
}
}
if (this.reader.shouldIncludeMemstoreTS()) {
ret.setSequenceId(currMemstoreTS);
try {
CellUtil.setSequenceId(ret, currMemstoreTS);
} catch (IOException e) {
// will not happen
}
}
return ret;
}
@ -919,9 +934,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
@Override
public Cell getKey() {
assertSeeked();
return new KeyValue.KeyOnlyKeyValue(blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE, currKeyLen);
// Create a new object so that this getKey is cached as firstKey, lastKey
Pair<ByteBuffer, Integer> keyPair = new Pair<ByteBuffer, Integer>();
blockBuffer.asSubByteBuffer(blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen, keyPair);
ByteBuffer keyBuf = keyPair.getFirst();
if (keyBuf.hasArray()) {
return new KeyValue.KeyOnlyKeyValue(keyBuf.array(), keyBuf.arrayOffset()
+ keyPair.getSecond(), currKeyLen);
} else {
return new ByteBufferedKeyOnlyKeyValue(keyBuf, keyPair.getSecond(), currKeyLen);
}
}
private static class ShareableMemoryKeyValue extends SizeCachedKeyValue implements
@ -950,14 +972,32 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
}
private static class ShareableMemoryOffheapKeyValue extends OffheapKeyValue implements
ShareableMemory {
public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length,
boolean hasTags) {
super(buf, offset, length, hasTags);
}
@Override
public Cell cloneToCell() {
byte[] copy = new byte[this.length];
ByteBufferUtils.copyFromBufferToArray(copy, this.buf, this.offset, 0, this.length);
return new SizeCachedKeyValue(copy, 0, copy.length);
}
}
@Override
public ByteBuffer getValue() {
assertSeeked();
// TODO : change here after BufferBacked cells come
return ByteBuffer.wrap(
blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
// Okie to create new Pair. Not used in hot path
Pair<ByteBuffer, Integer> valuePair = new Pair<ByteBuffer, Integer>();
this.blockBuffer.asSubByteBuffer(blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
currValueLen, valuePair);
ByteBuffer valBuf = valuePair.getFirst().duplicate();
valBuf.position(valuePair.getSecond());
valBuf.limit(currValueLen + valuePair.getSecond());
return valBuf.slice();
}
protected void setNonSeekedState() {
@ -1151,32 +1191,28 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
int klen = buffer.getInt();
buffer.skip(Bytes.SIZEOF_INT);// Skip value len part
ByteBuffer keyBuff = buffer.asSubByteBuffer(klen);
keyBuff.limit(keyBuff.position() + klen);
// Create a KeyOnlyKv now.
// TODO : Will change when Buffer backed cells come
return new KeyValue.KeyOnlyKeyValue(keyBuff.array(), keyBuff.arrayOffset()
+ keyBuff.position(), klen);
if (keyBuff.hasArray()) {
return new KeyValue.KeyOnlyKeyValue(keyBuff.array(), keyBuff.arrayOffset()
+ keyBuff.position(), klen);
} else {
return new ByteBufferedKeyOnlyKeyValue(keyBuff, keyBuff.position(), klen);
}
}
@Override
public String getKeyString() {
return Bytes.toStringBinary(blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position()
+ KEY_VALUE_LEN_SIZE, currKeyLen);
return CellUtil.toString(getKey(), false);
}
@Override
public String getValueString() {
return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
currValueLen);
return ByteBufferUtils.toStringBinary(getValue());
}
public int compareKey(CellComparator comparator, Cell key) {
this.keyOnlyKv.setKey(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
return comparator.compareKeyIgnoresMvcc(
key, this.keyOnlyKv);
blockBuffer.asSubByteBuffer(blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen, pair);
this.bufBackedKeyOnlyKv.setKey(pair.getFirst(), pair.getSecond(), currKeyLen);
return comparator.compareKeyIgnoresMvcc(key, this.bufBackedKeyOnlyKv);
}
@Override
@ -1534,7 +1570,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
private final HFileBlockDecodingContext decodingCtx;
private final DataBlockEncoder.EncodedSeeker seeker;
private final DataBlockEncoder dataBlockEncoder;
private final HFileContext meta;
public EncodedScanner(HFile.Reader reader, boolean cacheBlocks,
boolean pread, boolean isCompaction, HFileContext meta) {
@ -1544,7 +1579,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
seeker = dataBlockEncoder.createSeeker(
reader.getComparator(), decodingCtx);
this.meta = meta;
}
@Override
@ -1644,8 +1678,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
@Override
public String getValueString() {
ByteBuffer valueBuffer = getValue();
return Bytes.toStringBinary(valueBuffer.array(),
valueBuffer.arrayOffset(), valueBuffer.limit());
return ByteBufferUtils.toStringBinary(valueBuffer);
}
private void assertValidSeek() {
@ -1708,22 +1741,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return true; // We load file info in constructor in version 2.
}
/**
* Validates that the minor version is within acceptable limits.
* Otherwise throws an Runtime exception
*/
private void validateMinorVersion(Path path, int minorVersion) {
if (minorVersion < MIN_MINOR_VERSION ||
minorVersion > MAX_MINOR_VERSION) {
String msg = "Minor version for path " + path +
" is expected to be between " +
MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
" but is found to be " + minorVersion;
LOG.error(msg);
throw new RuntimeException(msg);
}
}
@Override
public HFileContext getFileContext() {
return hfileContext;

View File

@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferArray;
import org.apache.hadoop.hbase.util.Pair;
@ -69,22 +68,14 @@ public class ByteBufferIOEngine implements IOEngine {
@Override
public Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException {
// TODO : this allocate and copy will go away once we create BB backed cells
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(),
dstBuffer.arrayOffset());
ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length);
// Here the buffer that is created directly refers to the buffer in the actual buckets.
// When any cell is referring to the blocks created out of these buckets then it means that
// those cells are referring to a shared memory area which if evicted by the BucketCache would
// lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY
// so that the readers using this block are aware of this fact and do the necessary action
// to prevent eviction till the results are either consumed or copied
if (dstBuffer.limit() != length) {
throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
+ " expected");
}
// TODO : to be removed - make it conditional
return new Pair<ByteBuff, MemoryType>(new SingleByteBuff(dstBuffer), MemoryType.SHARED);
return new Pair<ByteBuff, MemoryType>(dstBuffer, MemoryType.SHARED);
}
/**

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.util;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
import java.util.Random;

View File

@ -107,10 +107,6 @@ public class TestByteBufferIOEngine {
offset = (int) (Math.random() * (capacity - maxBlockSize));
}
ioEngine.write(srcBuffer, offset);
//ByteBuffer dstBuffer = ByteBuffer.allocate(blockSize);
//ioEngine.read(dstBuffer, offset);
//MultiByteBuffer read = new MultiByteBuffer(dstBuffer);
// TODO : this will get changed after HBASE-12295 goes in
Pair<ByteBuff, MemoryType> read = ioEngine.read(offset, blockSize);
for (int j = 0; j < byteArray.length; ++j) {
assertTrue(srcBuffer.get(j) == read.getFirst().get(j));