HBASE-16213 A new HFileBlock structure for fast random get. (binlijin)
This commit is contained in:
parent
b1ee8a88c3
commit
0d99e827b2
|
@ -16,15 +16,15 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Provide access to all data block encoding algorithms. All of the algorithms
|
||||
* are required to have unique id which should <b>NEVER</b> be changed. If you
|
||||
|
@ -43,7 +43,8 @@ public enum DataBlockEncoding {
|
|||
FAST_DIFF(4, "org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder"),
|
||||
// id 5 is reserved for the COPY_KEY algorithm for benchmarking
|
||||
// COPY_KEY(5, "org.apache.hadoop.hbase.io.encoding.CopyKeyDataBlockEncoder"),
|
||||
PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
|
||||
PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"),
|
||||
ROW_INDEX_V1(7, "org.apache.hadoop.hbase.io.encoding.RowIndexCodecV1");
|
||||
|
||||
private final short id;
|
||||
private final byte[] idInBytes;
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
/**
|
||||
* Store cells following every row's start offset, so we can binary search to a row's cells.
|
||||
*
|
||||
* Format:
|
||||
* flat cells
|
||||
* integer: number of rows
|
||||
* integer: row0's offset
|
||||
* integer: row1's offset
|
||||
* ....
|
||||
* integer: dataSize
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RowIndexCodecV1 implements DataBlockEncoder {
|
||||
|
||||
private static class RowIndexEncodingState extends EncodingState {
|
||||
RowIndexEncoderV1 encoder = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx,
|
||||
DataOutputStream out) throws IOException {
|
||||
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
|
||||
throw new IOException(this.getClass().getName() + " only accepts "
|
||||
+ HFileBlockDefaultEncodingContext.class.getName() + " as the "
|
||||
+ "encoding context.");
|
||||
}
|
||||
|
||||
HFileBlockDefaultEncodingContext encodingCtx = (HFileBlockDefaultEncodingContext) blkEncodingCtx;
|
||||
encodingCtx.prepareEncoding(out);
|
||||
|
||||
RowIndexEncoderV1 encoder = new RowIndexEncoderV1(out, encodingCtx);
|
||||
RowIndexEncodingState state = new RowIndexEncodingState();
|
||||
state.encoder = encoder;
|
||||
blkEncodingCtx.setEncodingState(state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx,
|
||||
DataOutputStream out) throws IOException {
|
||||
RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
|
||||
.getEncodingState();
|
||||
RowIndexEncoderV1 encoder = state.encoder;
|
||||
return encoder.write(cell);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endBlockEncoding(HFileBlockEncodingContext encodingCtx,
|
||||
DataOutputStream out, byte[] uncompressedBytesWithHeader)
|
||||
throws IOException {
|
||||
RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
|
||||
.getEncodingState();
|
||||
RowIndexEncoderV1 encoder = state.encoder;
|
||||
encoder.flush();
|
||||
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
|
||||
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
|
||||
} else {
|
||||
encodingCtx.postEncoding(BlockType.DATA);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||
HFileBlockDecodingContext decodingCtx) throws IOException {
|
||||
ByteBuffer sourceAsBuffer = ByteBufferUtils
|
||||
.drainInputStreamToBuffer(source);// waste
|
||||
sourceAsBuffer.mark();
|
||||
if (!decodingCtx.getHFileContext().isIncludesTags()) {
|
||||
sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT);
|
||||
int onDiskSize = sourceAsBuffer.getInt();
|
||||
sourceAsBuffer.reset();
|
||||
ByteBuffer dup = sourceAsBuffer.duplicate();
|
||||
dup.position(sourceAsBuffer.position());
|
||||
dup.limit(sourceAsBuffer.position() + onDiskSize);
|
||||
return dup.slice();
|
||||
} else {
|
||||
RowIndexSeekerV1 seeker = new RowIndexSeekerV1(CellComparator.COMPARATOR,
|
||||
decodingCtx);
|
||||
seeker.setCurrentBuffer(new SingleByteBuff(sourceAsBuffer));
|
||||
List<Cell> kvs = new ArrayList<Cell>();
|
||||
kvs.add(seeker.getCell());
|
||||
while (seeker.next()) {
|
||||
kvs.add(seeker.getCell());
|
||||
}
|
||||
boolean includesMvcc = decodingCtx.getHFileContext().isIncludesMvcc();
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream out = new DataOutputStream(baos);
|
||||
for (Cell cell : kvs) {
|
||||
KeyValue currentCell = KeyValueUtil.copyToNewKeyValue(cell);
|
||||
out.write(currentCell.getBuffer(), currentCell.getOffset(),
|
||||
currentCell.getLength());
|
||||
if (includesMvcc) {
|
||||
WritableUtils.writeVLong(out, cell.getSequenceId());
|
||||
}
|
||||
}
|
||||
out.flush();
|
||||
return ByteBuffer.wrap(baos.getBuffer(), 0, baos.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlockEncodingContext newDataBlockEncodingContext(
|
||||
DataBlockEncoding encoding, byte[] header, HFileContext meta) {
|
||||
return new HFileBlockDefaultEncodingContext(encoding, header, meta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
|
||||
return new HFileBlockDefaultDecodingContext(meta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell getFirstKeyCellInBlock(ByteBuff block) {
|
||||
block.mark();
|
||||
int keyLength = block.getInt();
|
||||
block.getInt();
|
||||
ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
|
||||
block.reset();
|
||||
return createFirstKeyCell(key, keyLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EncodedSeeker createSeeker(CellComparator comparator,
|
||||
HFileBlockDecodingContext decodingCtx) {
|
||||
return new RowIndexSeekerV1(comparator, decodingCtx);
|
||||
}
|
||||
|
||||
protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
|
||||
if (key.hasArray()) {
|
||||
return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset()
|
||||
+ key.position(), keyLength);
|
||||
} else {
|
||||
return new ByteBufferedKeyOnlyKeyValue(key, key.position(), keyLength);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class RowIndexEncoderV1 {
|
||||
private static final Log LOG = LogFactory.getLog(RowIndexEncoderV1.class);
|
||||
|
||||
/** The Cell previously appended. */
|
||||
private Cell lastCell = null;
|
||||
|
||||
private DataOutputStream out;
|
||||
private HFileBlockDefaultEncodingContext encodingCtx;
|
||||
private int startOffset = -1;
|
||||
private ByteArrayOutputStream rowsOffsetBAOS = new ByteArrayOutputStream(
|
||||
64 * 4);
|
||||
|
||||
public RowIndexEncoderV1(DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) {
|
||||
this.out = out;
|
||||
this.encodingCtx = encodingCtx;
|
||||
}
|
||||
|
||||
public int write(Cell cell) throws IOException {
|
||||
// checkRow uses comparator to check we are writing in order.
|
||||
if (!checkRow(cell)) {
|
||||
if (startOffset < 0) {
|
||||
startOffset = out.size();
|
||||
}
|
||||
rowsOffsetBAOS.writeInt(out.size() - startOffset);
|
||||
}
|
||||
int klength = KeyValueUtil.keyLength(cell);
|
||||
int vlength = cell.getValueLength();
|
||||
out.writeInt(klength);
|
||||
out.writeInt(vlength);
|
||||
CellUtil.writeFlatKey(cell, out);
|
||||
// Write the value part
|
||||
CellUtil.writeValue(out, cell, vlength);
|
||||
int encodedKvSize = klength + vlength
|
||||
+ KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||
// Write the additional tag into the stream
|
||||
if (encodingCtx.getHFileContext().isIncludesTags()) {
|
||||
int tagsLength = cell.getTagsLength();
|
||||
out.writeShort(tagsLength);
|
||||
if (tagsLength > 0) {
|
||||
CellUtil.writeTags(out, cell, tagsLength);
|
||||
}
|
||||
encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
|
||||
}
|
||||
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
|
||||
WritableUtils.writeVLong(out, cell.getSequenceId());
|
||||
encodedKvSize += WritableUtils.getVIntSize(cell.getSequenceId());
|
||||
}
|
||||
lastCell = cell;
|
||||
return encodedKvSize;
|
||||
}
|
||||
|
||||
protected boolean checkRow(final Cell cell) throws IOException {
|
||||
boolean isDuplicateRow = false;
|
||||
if (cell == null) {
|
||||
throw new IOException("Key cannot be null or empty");
|
||||
}
|
||||
if (lastCell != null) {
|
||||
int keyComp = CellComparator.COMPARATOR.compareRows(lastCell, cell);
|
||||
if (keyComp > 0) {
|
||||
throw new IOException("Added a key not lexically larger than"
|
||||
+ " previous. Current cell = " + cell + ", lastCell = " + lastCell);
|
||||
} else if (keyComp == 0) {
|
||||
isDuplicateRow = true;
|
||||
}
|
||||
}
|
||||
return isDuplicateRow;
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
int onDiskDataSize = 0;
|
||||
if (startOffset >= 0) {
|
||||
onDiskDataSize = out.size() - startOffset;
|
||||
}
|
||||
out.writeInt(rowsOffsetBAOS.size() / 4);
|
||||
if (rowsOffsetBAOS.size() > 0) {
|
||||
out.write(rowsOffsetBAOS.getBuffer(), 0, rowsOffsetBAOS.size());
|
||||
}
|
||||
out.writeInt(onDiskDataSize);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("RowNumber: " + rowsOffsetBAOS.size() / 4
|
||||
+ ", onDiskDataSize: " + onDiskDataSize + ", totalOnDiskSize: "
|
||||
+ (out.size() - startOffset));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,413 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.hbase.ByteBufferedCell;
|
||||
import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.OffheapKeyValue;
|
||||
import org.apache.hadoop.hbase.SizeCachedKeyValue;
|
||||
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ObjectIntPair;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class RowIndexSeekerV1 implements EncodedSeeker {
|
||||
|
||||
private HFileBlockDecodingContext decodingCtx;
|
||||
private final CellComparator comparator;
|
||||
|
||||
// A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
|
||||
// many object creations.
|
||||
protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<ByteBuffer>();
|
||||
|
||||
private ByteBuff currentBuffer;
|
||||
private SeekerState current = new SeekerState(); // always valid
|
||||
private SeekerState previous = new SeekerState(); // may not be valid
|
||||
|
||||
private int rowNumber;
|
||||
private ByteBuff rowOffsets = null;
|
||||
|
||||
public RowIndexSeekerV1(CellComparator comparator,
|
||||
HFileBlockDecodingContext decodingCtx) {
|
||||
this.comparator = comparator;
|
||||
this.decodingCtx = decodingCtx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCurrentBuffer(ByteBuff buffer) {
|
||||
int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT);
|
||||
|
||||
// Data part
|
||||
ByteBuff dup = buffer.duplicate();
|
||||
dup.position(buffer.position());
|
||||
dup.limit(buffer.position() + onDiskSize);
|
||||
currentBuffer = dup.slice();
|
||||
current.currentBuffer = currentBuffer;
|
||||
buffer.skip(onDiskSize);
|
||||
|
||||
// Row offset
|
||||
rowNumber = buffer.getInt();
|
||||
int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber;
|
||||
ByteBuff rowDup = buffer.duplicate();
|
||||
rowDup.position(buffer.position());
|
||||
rowDup.limit(buffer.position() + totalRowOffsetsLength);
|
||||
rowOffsets = rowDup.slice();
|
||||
|
||||
decodeFirst();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell getKey() {
|
||||
if (current.keyBuffer.hasArray()) {
|
||||
return new KeyValue.KeyOnlyKeyValue(current.keyBuffer.array(),
|
||||
current.keyBuffer.arrayOffset() + current.keyBuffer.position(),
|
||||
current.keyLength);
|
||||
} else {
|
||||
byte[] key = new byte[current.keyLength];
|
||||
ByteBufferUtils.copyFromBufferToArray(key, current.keyBuffer,
|
||||
current.keyBuffer.position(), 0, current.keyLength);
|
||||
return new KeyValue.KeyOnlyKeyValue(key, 0, current.keyLength);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getValueShallowCopy() {
|
||||
currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength,
|
||||
tmpPair);
|
||||
ByteBuffer dup = tmpPair.getFirst().duplicate();
|
||||
dup.position(tmpPair.getSecond());
|
||||
dup.limit(tmpPair.getSecond() + current.valueLength);
|
||||
return dup.slice();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell getCell() {
|
||||
return current.toCell();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rewind() {
|
||||
currentBuffer.rewind();
|
||||
decodeFirst();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next() {
|
||||
if (!currentBuffer.hasRemaining()) {
|
||||
return false;
|
||||
}
|
||||
decodeNext();
|
||||
previous.invalidate();
|
||||
return true;
|
||||
}
|
||||
|
||||
private int binarySearch(Cell seekCell, boolean seekBefore) {
|
||||
int low = 0;
|
||||
int high = rowNumber - 1;
|
||||
int mid = (low + high) >>> 1;
|
||||
int comp = 0;
|
||||
while (low <= high) {
|
||||
mid = (low + high) >>> 1;
|
||||
ByteBuffer row = getRow(mid);
|
||||
comp = compareRows(row, seekCell);
|
||||
if (comp < 0) {
|
||||
low = mid + 1;
|
||||
} else if (comp > 0) {
|
||||
high = mid - 1;
|
||||
} else {
|
||||
// key found
|
||||
if (seekBefore) {
|
||||
return mid - 1;
|
||||
} else {
|
||||
return mid;
|
||||
}
|
||||
}
|
||||
}
|
||||
// key not found.
|
||||
if (comp > 0) {
|
||||
return mid - 1;
|
||||
} else {
|
||||
return mid;
|
||||
}
|
||||
}
|
||||
|
||||
private int compareRows(ByteBuffer row, Cell seekCell) {
|
||||
if (seekCell instanceof ByteBufferedCell) {
|
||||
return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
|
||||
((ByteBufferedCell) seekCell).getRowByteBuffer(),
|
||||
((ByteBufferedCell) seekCell).getRowPosition(),
|
||||
seekCell.getRowLength());
|
||||
} else {
|
||||
return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
|
||||
seekCell.getRowArray(), seekCell.getRowOffset(),
|
||||
seekCell.getRowLength());
|
||||
}
|
||||
}
|
||||
|
||||
private ByteBuffer getRow(int index) {
|
||||
int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
|
||||
ByteBuff block = currentBuffer.duplicate();
|
||||
block.position(offset + Bytes.SIZEOF_LONG);
|
||||
short rowLen = block.getShort();
|
||||
block.asSubByteBuffer(block.position(), rowLen, tmpPair);
|
||||
ByteBuffer row = tmpPair.getFirst();
|
||||
row.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + rowLen);
|
||||
return row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
|
||||
previous.invalidate();
|
||||
int index = binarySearch(seekCell, seekBefore);
|
||||
if (index < 0) {
|
||||
return HConstants.INDEX_KEY_MAGIC; // using optimized index key
|
||||
} else {
|
||||
int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
|
||||
if (offset != 0) {
|
||||
decodeAtPosition(offset);
|
||||
}
|
||||
}
|
||||
do {
|
||||
int comp;
|
||||
comp = comparator.compareKeyIgnoresMvcc(seekCell, current.currentKey);
|
||||
if (comp == 0) { // exact match
|
||||
if (seekBefore) {
|
||||
if (!previous.isValid()) {
|
||||
// The caller (seekBefore) has to ensure that we are not at the
|
||||
// first key in the block.
|
||||
throw new IllegalStateException("Cannot seekBefore if "
|
||||
+ "positioned at the first key in the block: key="
|
||||
+ Bytes.toStringBinary(seekCell.getRowArray()));
|
||||
}
|
||||
moveToPrevious();
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (comp < 0) { // already too large, check previous
|
||||
if (previous.isValid()) {
|
||||
moveToPrevious();
|
||||
} else {
|
||||
return HConstants.INDEX_KEY_MAGIC; // using optimized index key
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
// move to next, if more data is available
|
||||
if (currentBuffer.hasRemaining()) {
|
||||
previous.copyFromNext(current);
|
||||
decodeNext();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} while (true);
|
||||
|
||||
// we hit the end of the block, not an exact match
|
||||
return 1;
|
||||
}
|
||||
|
||||
private void moveToPrevious() {
|
||||
if (!previous.isValid()) {
|
||||
throw new IllegalStateException("Can move back only once and not in first key in the block.");
|
||||
}
|
||||
|
||||
SeekerState tmp = previous;
|
||||
previous = current;
|
||||
current = tmp;
|
||||
|
||||
// move after last key value
|
||||
currentBuffer.position(current.nextKvOffset);
|
||||
previous.invalidate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareKey(CellComparator comparator, Cell key) {
|
||||
return comparator.compareKeyIgnoresMvcc(key, current.currentKey);
|
||||
}
|
||||
|
||||
protected void decodeFirst() {
|
||||
decodeNext();
|
||||
previous.invalidate();
|
||||
}
|
||||
|
||||
protected void decodeAtPosition(int position) {
|
||||
currentBuffer.position(position);
|
||||
decodeNext();
|
||||
previous.invalidate();
|
||||
}
|
||||
|
||||
protected void decodeNext() {
|
||||
current.startOffset = currentBuffer.position();
|
||||
long ll = currentBuffer.getLongAfterPosition(0);
|
||||
// Read top half as an int of key length and bottom int as value length
|
||||
current.keyLength = (int) (ll >> Integer.SIZE);
|
||||
current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
|
||||
currentBuffer.skip(Bytes.SIZEOF_LONG);
|
||||
// key part
|
||||
currentBuffer.asSubByteBuffer(currentBuffer.position(), current.keyLength,
|
||||
tmpPair);
|
||||
ByteBuffer key = tmpPair.getFirst().duplicate();
|
||||
key.position(tmpPair.getSecond()).limit(
|
||||
tmpPair.getSecond() + current.keyLength);
|
||||
current.keyBuffer = key;
|
||||
currentBuffer.skip(current.keyLength);
|
||||
// value part
|
||||
current.valueOffset = currentBuffer.position();
|
||||
currentBuffer.skip(current.valueLength);
|
||||
if (includesTags()) {
|
||||
decodeTags();
|
||||
}
|
||||
if (includesMvcc()) {
|
||||
current.memstoreTS = ByteBuff.readVLong(currentBuffer);
|
||||
} else {
|
||||
current.memstoreTS = 0;
|
||||
}
|
||||
current.nextKvOffset = currentBuffer.position();
|
||||
current.currentKey.setKey(current.keyBuffer, tmpPair.getSecond(),
|
||||
current.keyLength);
|
||||
}
|
||||
|
||||
protected boolean includesMvcc() {
|
||||
return this.decodingCtx.getHFileContext().isIncludesMvcc();
|
||||
}
|
||||
|
||||
protected boolean includesTags() {
|
||||
return this.decodingCtx.getHFileContext().isIncludesTags();
|
||||
}
|
||||
|
||||
protected void decodeTags() {
|
||||
current.tagsLength = currentBuffer.getShortAfterPosition(0);
|
||||
currentBuffer.skip(Bytes.SIZEOF_SHORT);
|
||||
current.tagsOffset = currentBuffer.position();
|
||||
currentBuffer.skip(current.tagsLength);
|
||||
}
|
||||
|
||||
private class SeekerState {
|
||||
/**
|
||||
* The size of a (key length, value length) tuple that prefixes each entry
|
||||
* in a data block.
|
||||
*/
|
||||
public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
|
||||
|
||||
protected ByteBuff currentBuffer;
|
||||
protected int startOffset = -1;
|
||||
protected int valueOffset = -1;
|
||||
protected int keyLength;
|
||||
protected int valueLength;
|
||||
protected int tagsLength = 0;
|
||||
protected int tagsOffset = -1;
|
||||
|
||||
protected ByteBuffer keyBuffer = null;
|
||||
protected long memstoreTS;
|
||||
protected int nextKvOffset;
|
||||
// buffer backed keyonlyKV
|
||||
private ByteBufferedKeyOnlyKeyValue currentKey = new ByteBufferedKeyOnlyKeyValue();
|
||||
|
||||
protected boolean isValid() {
|
||||
return valueOffset != -1;
|
||||
}
|
||||
|
||||
protected void invalidate() {
|
||||
valueOffset = -1;
|
||||
currentKey = new ByteBufferedKeyOnlyKeyValue();
|
||||
currentBuffer = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy the state from the next one into this instance (the previous state placeholder). Used to
|
||||
* save the previous state when we are advancing the seeker to the next key/value.
|
||||
*/
|
||||
protected void copyFromNext(SeekerState nextState) {
|
||||
keyBuffer = nextState.keyBuffer;
|
||||
currentKey.setKey(nextState.keyBuffer,
|
||||
nextState.currentKey.getRowPosition() - Bytes.SIZEOF_SHORT,
|
||||
nextState.keyLength);
|
||||
|
||||
startOffset = nextState.startOffset;
|
||||
valueOffset = nextState.valueOffset;
|
||||
keyLength = nextState.keyLength;
|
||||
valueLength = nextState.valueLength;
|
||||
nextKvOffset = nextState.nextKvOffset;
|
||||
memstoreTS = nextState.memstoreTS;
|
||||
currentBuffer = nextState.currentBuffer;
|
||||
tagsOffset = nextState.tagsOffset;
|
||||
tagsLength = nextState.tagsLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return CellUtil.getCellKeyAsString(toCell());
|
||||
}
|
||||
|
||||
protected int getCellBufSize() {
|
||||
int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength;
|
||||
if (includesTags()) {
|
||||
kvBufSize += Bytes.SIZEOF_SHORT + tagsLength;
|
||||
}
|
||||
return kvBufSize;
|
||||
}
|
||||
|
||||
public Cell toCell() {
|
||||
Cell ret;
|
||||
int cellBufSize = getCellBufSize();
|
||||
long seqId = 0l;
|
||||
if (includesMvcc()) {
|
||||
seqId = memstoreTS;
|
||||
}
|
||||
if (currentBuffer.hasArray()) {
|
||||
// TODO : reduce the varieties of KV here. Check if based on a boolean
|
||||
// we can handle the 'no tags' case.
|
||||
if (tagsLength > 0) {
|
||||
ret = new SizeCachedKeyValue(currentBuffer.array(),
|
||||
currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId);
|
||||
} else {
|
||||
ret = new SizeCachedNoTagsKeyValue(currentBuffer.array(),
|
||||
currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId);
|
||||
}
|
||||
} else {
|
||||
currentBuffer.asSubByteBuffer(startOffset, cellBufSize, tmpPair);
|
||||
ByteBuffer buf = tmpPair.getFirst();
|
||||
if (buf.isDirect()) {
|
||||
ret = new OffheapKeyValue(buf, tmpPair.getSecond(), cellBufSize,
|
||||
tagsLength > 0, seqId);
|
||||
} else {
|
||||
if (tagsLength > 0) {
|
||||
ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset()
|
||||
+ tmpPair.getSecond(), cellBufSize, seqId);
|
||||
} else {
|
||||
ret = new SizeCachedNoTagsKeyValue(buf.array(), buf.arrayOffset()
|
||||
+ tmpPair.getSecond(), cellBufSize, seqId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -102,7 +102,7 @@ public class TestSeekToBlockWithEncoders {
|
|||
KeyValue kv4 = new KeyValue(Bytes.toBytes("aad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||
Bytes.toBytes("val"));
|
||||
sampleKv.add(kv4);
|
||||
KeyValue kv5 = new KeyValue(Bytes.toBytes("aaaad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||
KeyValue kv5 = new KeyValue(Bytes.toBytes("aaddd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||
Bytes.toBytes("val"));
|
||||
sampleKv.add(kv5);
|
||||
KeyValue toSeek = new KeyValue(Bytes.toBytes("aaaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||
|
@ -125,7 +125,7 @@ public class TestSeekToBlockWithEncoders {
|
|||
KeyValue kv3 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||
Bytes.toBytes("val"));
|
||||
sampleKv.add(kv3);
|
||||
KeyValue kv4 = new KeyValue(Bytes.toBytes("aaae"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||
KeyValue kv4 = new KeyValue(Bytes.toBytes("aade"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||
Bytes.toBytes("val"));
|
||||
sampleKv.add(kv4);
|
||||
KeyValue kv5 = new KeyValue(Bytes.toBytes("bbbcd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||
|
|
Loading…
Reference in New Issue