HBASE-16213 A new HFileBlock structure for fast random get. (binlijin)
This commit is contained in:
parent
35fa341913
commit
c899897bc8
|
@ -0,0 +1,127 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.BufferOverflowException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Our own implementation of ByteArrayOutputStream where all methods are NOT
|
||||||
|
* synchronized and supports writing ByteBuffer directly to it.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ByteArrayOutputStream extends OutputStream {
|
||||||
|
|
||||||
|
// Borrowed from openJDK:
|
||||||
|
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
|
||||||
|
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
|
||||||
|
|
||||||
|
private byte[] buf;
|
||||||
|
private int pos = 0;
|
||||||
|
|
||||||
|
public ByteArrayOutputStream() {
|
||||||
|
this(32);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ByteArrayOutputStream(int capacity) {
|
||||||
|
this.buf = new byte[capacity];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes an <code>int</code> to the underlying output stream as four
|
||||||
|
* bytes, high byte first.
|
||||||
|
* @param i the <code>int</code> to write
|
||||||
|
* @throws IOException if an I/O error occurs.
|
||||||
|
*/
|
||||||
|
public void writeInt(int i) throws IOException {
|
||||||
|
checkSizeAndGrow(Bytes.SIZEOF_INT);
|
||||||
|
Bytes.putInt(this.buf, this.pos, i);
|
||||||
|
this.pos += Bytes.SIZEOF_INT;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(int b) throws IOException {
|
||||||
|
checkSizeAndGrow(Bytes.SIZEOF_BYTE);
|
||||||
|
buf[this.pos] = (byte) b;
|
||||||
|
this.pos++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(byte[] b, int off, int len) throws IOException {
|
||||||
|
checkSizeAndGrow(len);
|
||||||
|
System.arraycopy(b, off, this.buf, this.pos, len);
|
||||||
|
this.pos += len;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkSizeAndGrow(int extra) {
|
||||||
|
long capacityNeeded = this.pos + (long) extra;
|
||||||
|
if (capacityNeeded > this.buf.length) {
|
||||||
|
// guarantee it's possible to fit
|
||||||
|
if (capacityNeeded > MAX_ARRAY_SIZE) {
|
||||||
|
throw new BufferOverflowException();
|
||||||
|
}
|
||||||
|
// double until hit the cap
|
||||||
|
long nextCapacity = Math.min(this.buf.length << 1, MAX_ARRAY_SIZE);
|
||||||
|
// but make sure there is enough if twice the existing capacity is still
|
||||||
|
// too small
|
||||||
|
nextCapacity = Math.max(nextCapacity, capacityNeeded);
|
||||||
|
if (nextCapacity > MAX_ARRAY_SIZE) {
|
||||||
|
throw new BufferOverflowException();
|
||||||
|
}
|
||||||
|
byte[] newBuf = new byte[(int) nextCapacity];
|
||||||
|
System.arraycopy(buf, 0, newBuf, 0, buf.length);
|
||||||
|
buf = newBuf;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resets the <code>pos</code> field of this byte array output stream to zero.
|
||||||
|
* The output stream can be used again.
|
||||||
|
*/
|
||||||
|
public void reset() {
|
||||||
|
this.pos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copies the content of this Stream into a new byte array.
|
||||||
|
*
|
||||||
|
* @return the contents of this output stream, as new byte array.
|
||||||
|
*/
|
||||||
|
public byte toByteArray()[] {
|
||||||
|
return Arrays.copyOf(buf, pos);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the underlying array where the data gets accumulated
|
||||||
|
*/
|
||||||
|
public byte[] getBuffer() {
|
||||||
|
return this.buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The current size of the buffer.
|
||||||
|
*/
|
||||||
|
public int size() {
|
||||||
|
return this.pos;
|
||||||
|
}
|
||||||
|
}
|
|
@ -43,7 +43,8 @@ public enum DataBlockEncoding {
|
||||||
FAST_DIFF(4, "org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder"),
|
FAST_DIFF(4, "org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder"),
|
||||||
// id 5 is reserved for the COPY_KEY algorithm for benchmarking
|
// id 5 is reserved for the COPY_KEY algorithm for benchmarking
|
||||||
// COPY_KEY(5, "org.apache.hadoop.hbase.io.encoding.CopyKeyDataBlockEncoder"),
|
// COPY_KEY(5, "org.apache.hadoop.hbase.io.encoding.CopyKeyDataBlockEncoder"),
|
||||||
PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
|
PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"),
|
||||||
|
ROW_INDEX_V1(7, "org.apache.hadoop.hbase.io.encoding.RowIndexCodecV1");
|
||||||
|
|
||||||
private final short id;
|
private final short id;
|
||||||
private final byte[] idInBytes;
|
private final byte[] idInBytes;
|
||||||
|
|
|
@ -0,0 +1,165 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.encoding;
|
||||||
|
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store cells following every row's start offset, so we can binary search to a row's cells.
|
||||||
|
*
|
||||||
|
* Format:
|
||||||
|
* flat cells
|
||||||
|
* integer: number of rows
|
||||||
|
* integer: row0's offset
|
||||||
|
* integer: row1's offset
|
||||||
|
* ....
|
||||||
|
* integer: dataSize
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RowIndexCodecV1 implements DataBlockEncoder {
|
||||||
|
|
||||||
|
private static class RowIndexEncodingState extends EncodingState {
|
||||||
|
RowIndexEncoderV1 encoder = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx,
|
||||||
|
DataOutputStream out) throws IOException {
|
||||||
|
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
|
||||||
|
throw new IOException(this.getClass().getName() + " only accepts "
|
||||||
|
+ HFileBlockDefaultEncodingContext.class.getName() + " as the "
|
||||||
|
+ "encoding context.");
|
||||||
|
}
|
||||||
|
|
||||||
|
HFileBlockDefaultEncodingContext encodingCtx = (HFileBlockDefaultEncodingContext) blkEncodingCtx;
|
||||||
|
encodingCtx.prepareEncoding(out);
|
||||||
|
|
||||||
|
RowIndexEncoderV1 encoder = new RowIndexEncoderV1(out, encodingCtx);
|
||||||
|
RowIndexEncodingState state = new RowIndexEncodingState();
|
||||||
|
state.encoder = encoder;
|
||||||
|
blkEncodingCtx.setEncodingState(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx,
|
||||||
|
DataOutputStream out) throws IOException {
|
||||||
|
RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
|
||||||
|
.getEncodingState();
|
||||||
|
RowIndexEncoderV1 encoder = state.encoder;
|
||||||
|
return encoder.write(cell);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void endBlockEncoding(HFileBlockEncodingContext encodingCtx,
|
||||||
|
DataOutputStream out, byte[] uncompressedBytesWithHeader)
|
||||||
|
throws IOException {
|
||||||
|
RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
|
||||||
|
.getEncodingState();
|
||||||
|
RowIndexEncoderV1 encoder = state.encoder;
|
||||||
|
encoder.flush();
|
||||||
|
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
|
||||||
|
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
|
||||||
|
} else {
|
||||||
|
encodingCtx.postEncoding(BlockType.DATA);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer decodeKeyValues(DataInputStream source,
|
||||||
|
HFileBlockDecodingContext decodingCtx) throws IOException {
|
||||||
|
if (!decodingCtx.getHFileContext().isIncludesTags()) {
|
||||||
|
ByteBuffer sourceAsBuffer = ByteBufferUtils
|
||||||
|
.drainInputStreamToBuffer(source);// waste
|
||||||
|
sourceAsBuffer.mark();
|
||||||
|
sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT);
|
||||||
|
int onDiskSize = sourceAsBuffer.getInt();
|
||||||
|
sourceAsBuffer.reset();
|
||||||
|
ByteBuffer dup = sourceAsBuffer.duplicate();
|
||||||
|
dup.position(sourceAsBuffer.position());
|
||||||
|
dup.limit(sourceAsBuffer.position() + onDiskSize);
|
||||||
|
return dup.slice();
|
||||||
|
} else {
|
||||||
|
ByteBuffer sourceAsBuffer = ByteBufferUtils
|
||||||
|
.drainInputStreamToBuffer(source);// waste
|
||||||
|
sourceAsBuffer.mark();
|
||||||
|
RowIndexSeekerV1 seeker = new RowIndexSeekerV1(KeyValue.COMPARATOR,
|
||||||
|
decodingCtx);
|
||||||
|
seeker.setCurrentBuffer(sourceAsBuffer);
|
||||||
|
List<ByteBuffer> kvs = new ArrayList<ByteBuffer>();
|
||||||
|
kvs.add(seeker.getKeyValueBuffer());
|
||||||
|
while (seeker.next()) {
|
||||||
|
kvs.add(seeker.getKeyValueBuffer());
|
||||||
|
}
|
||||||
|
int totalLength = 0;
|
||||||
|
for (ByteBuffer buf : kvs) {
|
||||||
|
totalLength += buf.remaining();
|
||||||
|
}
|
||||||
|
byte[] keyValueBytes = new byte[totalLength];
|
||||||
|
ByteBuffer result = ByteBuffer.wrap(keyValueBytes);
|
||||||
|
for (ByteBuffer buf : kvs) {
|
||||||
|
result.put(buf);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
|
||||||
|
block.mark();
|
||||||
|
int keyLength = block.getInt();
|
||||||
|
block.getInt();
|
||||||
|
int pos = block.position();
|
||||||
|
block.reset();
|
||||||
|
ByteBuffer dup = block.duplicate();
|
||||||
|
dup.position(pos);
|
||||||
|
dup.limit(pos + keyLength);
|
||||||
|
return dup.slice();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EncodedSeeker createSeeker(KVComparator comparator,
|
||||||
|
HFileBlockDecodingContext decodingCtx) {
|
||||||
|
return new RowIndexSeekerV1(comparator, decodingCtx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HFileBlockEncodingContext newDataBlockEncodingContext(
|
||||||
|
DataBlockEncoding encoding, byte[] header, HFileContext meta) {
|
||||||
|
return new HFileBlockDefaultEncodingContext(encoding, header, meta);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
|
||||||
|
return new HFileBlockDefaultDecodingContext(meta);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.encoding;
|
||||||
|
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RowIndexEncoderV1 {
|
||||||
|
private static final Log LOG = LogFactory.getLog(RowIndexEncoderV1.class);
|
||||||
|
|
||||||
|
/** The Cell previously appended. */
|
||||||
|
private Cell lastCell = null;
|
||||||
|
|
||||||
|
private DataOutputStream out;
|
||||||
|
private HFileBlockDefaultEncodingContext encodingCtx;
|
||||||
|
private int startOffset = -1;
|
||||||
|
private ByteArrayOutputStream rowsOffsetBAOS = new ByteArrayOutputStream(
|
||||||
|
64 * 4);
|
||||||
|
|
||||||
|
public RowIndexEncoderV1(DataOutputStream out,
|
||||||
|
HFileBlockDefaultEncodingContext encodingCtx) {
|
||||||
|
this.out = out;
|
||||||
|
this.encodingCtx = encodingCtx;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int write(Cell cell) throws IOException {
|
||||||
|
// checkKey uses comparator to check we are writing in order.
|
||||||
|
if (!checkRow(cell)) {
|
||||||
|
if (startOffset < 0) {
|
||||||
|
startOffset = out.size();
|
||||||
|
}
|
||||||
|
rowsOffsetBAOS.writeInt(out.size() - startOffset);
|
||||||
|
}
|
||||||
|
int klength = KeyValueUtil.keyLength(cell);
|
||||||
|
int vlength = cell.getValueLength();
|
||||||
|
out.writeInt(klength);
|
||||||
|
out.writeInt(vlength);
|
||||||
|
CellUtil.writeFlatKey(cell, out);
|
||||||
|
// Write the value part
|
||||||
|
out.write(cell.getValueArray(), cell.getValueOffset(), vlength);
|
||||||
|
int encodedKvSize = klength + vlength
|
||||||
|
+ KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
|
||||||
|
// Write the additional tag into the stream
|
||||||
|
if (encodingCtx.getHFileContext().isIncludesTags()) {
|
||||||
|
int tagsLength = cell.getTagsLength();
|
||||||
|
out.writeShort(tagsLength);
|
||||||
|
// There are some tags to be written
|
||||||
|
if (tagsLength > 0) {
|
||||||
|
out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
|
||||||
|
}
|
||||||
|
encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
|
||||||
|
}
|
||||||
|
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
|
||||||
|
WritableUtils.writeVLong(out, cell.getSequenceId());
|
||||||
|
encodedKvSize += WritableUtils.getVIntSize(cell.getSequenceId());
|
||||||
|
}
|
||||||
|
lastCell = cell;
|
||||||
|
return encodedKvSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean checkRow(final Cell cell) throws IOException {
|
||||||
|
boolean isDuplicateRow = false;
|
||||||
|
if (cell == null) {
|
||||||
|
throw new IOException("Key cannot be null or empty");
|
||||||
|
}
|
||||||
|
if (lastCell != null) {
|
||||||
|
int keyComp = KeyValue.COMPARATOR.compareRows(lastCell, cell);
|
||||||
|
if (keyComp > 0) {
|
||||||
|
throw new IOException("Added a key not lexically larger than"
|
||||||
|
+ " previous. Current cell = " + cell + ", lastCell = " + lastCell);
|
||||||
|
} else if (keyComp == 0) {
|
||||||
|
isDuplicateRow = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return isDuplicateRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void flush() throws IOException {
|
||||||
|
int onDiskDataSize = 0;
|
||||||
|
if (startOffset >= 0) {
|
||||||
|
onDiskDataSize = out.size() - startOffset;
|
||||||
|
}
|
||||||
|
// rowsOffsetBAOS.size() / 4
|
||||||
|
out.writeInt(rowsOffsetBAOS.size() >> 2);
|
||||||
|
if (rowsOffsetBAOS.size() > 0) {
|
||||||
|
out.write(rowsOffsetBAOS.getBuffer(), 0, rowsOffsetBAOS.size());
|
||||||
|
}
|
||||||
|
out.writeInt(onDiskDataSize);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("RowNumber: " + (rowsOffsetBAOS.size() >> 2)
|
||||||
|
+ ", onDiskDataSize: " + onDiskDataSize + ", totalOnDiskSize: "
|
||||||
|
+ (out.size() - startOffset));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,431 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.encoding;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
import org.apache.hadoop.hbase.NoTagsKeyValue;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
|
||||||
|
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RowIndexSeekerV1 implements EncodedSeeker {
|
||||||
|
|
||||||
|
private HFileBlockDecodingContext decodingCtx;
|
||||||
|
private final KVComparator comparator;
|
||||||
|
|
||||||
|
private ByteBuffer currentBuffer;
|
||||||
|
private SeekerState current = new SeekerState(); // always valid
|
||||||
|
private SeekerState previous = new SeekerState(); // may not be valid
|
||||||
|
|
||||||
|
private int rowNumber;
|
||||||
|
private ByteBuffer rowOffsets = null;
|
||||||
|
|
||||||
|
public RowIndexSeekerV1(KVComparator comparator,
|
||||||
|
HFileBlockDecodingContext decodingCtx) {
|
||||||
|
this.comparator = comparator;
|
||||||
|
this.decodingCtx = decodingCtx;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCurrentBuffer(ByteBuffer buffer) {
|
||||||
|
int onDiskSize = Bytes.toIntUnsafe(buffer.array(), buffer.arrayOffset()
|
||||||
|
+ buffer.limit() - Bytes.SIZEOF_INT);
|
||||||
|
// int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT);
|
||||||
|
|
||||||
|
// Data part
|
||||||
|
ByteBuffer dup = buffer.duplicate();
|
||||||
|
dup.position(buffer.position());
|
||||||
|
dup.limit(buffer.position() + onDiskSize);
|
||||||
|
currentBuffer = dup.slice();
|
||||||
|
current.currentBuffer = currentBuffer;
|
||||||
|
ByteBufferUtils.skip(buffer, onDiskSize);
|
||||||
|
|
||||||
|
// Row offset
|
||||||
|
rowNumber = buffer.getInt();
|
||||||
|
// equals Bytes.SIZEOF_INT * rowNumber
|
||||||
|
int totalRowOffsetsLength = rowNumber << 2;
|
||||||
|
ByteBuffer rowDup = buffer.duplicate();
|
||||||
|
rowDup.position(buffer.position());
|
||||||
|
rowDup.limit(buffer.position() + totalRowOffsetsLength);
|
||||||
|
rowOffsets = rowDup.slice();
|
||||||
|
|
||||||
|
decodeFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer getKeyDeepCopy() {
|
||||||
|
ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
|
||||||
|
keyBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
|
||||||
|
current.keyLength);
|
||||||
|
keyBuffer.rewind();
|
||||||
|
return keyBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer getValueShallowCopy() {
|
||||||
|
ByteBuffer dup = currentBuffer.duplicate();
|
||||||
|
dup.position(current.valueOffset);
|
||||||
|
dup.limit(current.valueOffset + current.valueLength);
|
||||||
|
return dup.slice();
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer getKeyValueBuffer() {
|
||||||
|
ByteBuffer kvBuffer = createKVBuffer();
|
||||||
|
kvBuffer.putInt(current.keyLength);
|
||||||
|
kvBuffer.putInt(current.valueLength);
|
||||||
|
kvBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
|
||||||
|
current.keyLength);
|
||||||
|
ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer,
|
||||||
|
current.valueOffset, current.valueLength);
|
||||||
|
if (current.tagsLength > 0) {
|
||||||
|
// Put short as unsigned
|
||||||
|
kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
|
||||||
|
kvBuffer.put((byte) (current.tagsLength & 0xff));
|
||||||
|
if (current.tagsOffset != -1) {
|
||||||
|
ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer,
|
||||||
|
current.tagsOffset, current.tagsLength);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (includesMvcc()) {
|
||||||
|
ByteBufferUtils.writeVLong(kvBuffer, current.getSequenceId());
|
||||||
|
}
|
||||||
|
kvBuffer.rewind();
|
||||||
|
return kvBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ByteBuffer createKVBuffer() {
|
||||||
|
int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(
|
||||||
|
current.keyLength, current.valueLength, current.tagsLength);
|
||||||
|
if (includesMvcc()) {
|
||||||
|
kvBufSize += WritableUtils.getVIntSize(current.getSequenceId());
|
||||||
|
}
|
||||||
|
ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
|
||||||
|
return kvBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Cell getKeyValue() {
|
||||||
|
return current.toCell();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void rewind() {
|
||||||
|
currentBuffer.rewind();
|
||||||
|
decodeFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean next() {
|
||||||
|
if (!currentBuffer.hasRemaining()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
decodeNext();
|
||||||
|
previous.invalidate();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int seekToKeyInBlock(byte[] key, int offset, int length,
|
||||||
|
boolean seekBefore) {
|
||||||
|
return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length),
|
||||||
|
seekBefore);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int binarySearch(Cell seekCell, boolean seekBefore) {
|
||||||
|
int low = 0;
|
||||||
|
int high = rowNumber - 1;
|
||||||
|
int mid = (low + high) >>> 1;
|
||||||
|
int comp = 0;
|
||||||
|
SimpleMutableByteRange row = new SimpleMutableByteRange();
|
||||||
|
while (low <= high) {
|
||||||
|
mid = (low + high) >>> 1;
|
||||||
|
getRow(mid, row);
|
||||||
|
comp = comparator.compareRows(row.getBytes(), row.getOffset(),
|
||||||
|
row.getLength(), seekCell.getRowArray(), seekCell.getRowOffset(),
|
||||||
|
seekCell.getRowLength());
|
||||||
|
if (comp < 0) {
|
||||||
|
low = mid + 1;
|
||||||
|
} else if (comp > 0) {
|
||||||
|
high = mid - 1;
|
||||||
|
} else {
|
||||||
|
// key found
|
||||||
|
if (seekBefore) {
|
||||||
|
return mid - 1;
|
||||||
|
} else {
|
||||||
|
return mid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// key not found.
|
||||||
|
if (comp > 0) {
|
||||||
|
return mid - 1;
|
||||||
|
} else {
|
||||||
|
return mid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getRow(int index, SimpleMutableByteRange row) {
|
||||||
|
int offset = Bytes.toIntUnsafe(rowOffsets.array(), rowOffsets.arrayOffset()
|
||||||
|
+ (index << 2)); // index * Bytes.SIZEOF_INT
|
||||||
|
int position = currentBuffer.arrayOffset() + offset + Bytes.SIZEOF_LONG;
|
||||||
|
short rowLen = Bytes.toShortUnsafe(currentBuffer.array(), position);
|
||||||
|
row.set(currentBuffer.array(), position + Bytes.SIZEOF_SHORT, rowLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
|
||||||
|
previous.invalidate();
|
||||||
|
int index = binarySearch(seekCell, seekBefore);
|
||||||
|
if (index < 0) {
|
||||||
|
return HConstants.INDEX_KEY_MAGIC; // using optimized index key
|
||||||
|
} else {
|
||||||
|
int offset = Bytes.toIntUnsafe(rowOffsets.array(),
|
||||||
|
rowOffsets.arrayOffset() + (index << 2));
|
||||||
|
if (offset != 0) {
|
||||||
|
decodeAtPosition(offset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
do {
|
||||||
|
int comp;
|
||||||
|
comp = comparator.compareOnlyKeyPortion(seekCell, current.currentKey);
|
||||||
|
if (comp == 0) { // exact match
|
||||||
|
if (seekBefore) {
|
||||||
|
if (!previous.isValid()) {
|
||||||
|
// The caller (seekBefore) has to ensure that we are not at the
|
||||||
|
// first key in the block.
|
||||||
|
throw new IllegalStateException("Cannot seekBefore if "
|
||||||
|
+ "positioned at the first key in the block: key="
|
||||||
|
+ Bytes.toStringBinary(seekCell.getRowArray()));
|
||||||
|
}
|
||||||
|
moveToPrevious();
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (comp < 0) { // already too large, check previous
|
||||||
|
if (previous.isValid()) {
|
||||||
|
moveToPrevious();
|
||||||
|
} else {
|
||||||
|
return HConstants.INDEX_KEY_MAGIC; // using optimized index key
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// move to next, if more data is available
|
||||||
|
if (currentBuffer.hasRemaining()) {
|
||||||
|
previous.copyFromNext(current);
|
||||||
|
decodeNext();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} while (true);
|
||||||
|
|
||||||
|
// we hit the end of the block, not an exact match
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void moveToPrevious() {
|
||||||
|
if (!previous.isValid()) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Can move back only once and not in first key in the block.");
|
||||||
|
}
|
||||||
|
|
||||||
|
SeekerState tmp = previous;
|
||||||
|
previous = current;
|
||||||
|
current = tmp;
|
||||||
|
|
||||||
|
// move after last key value
|
||||||
|
currentBuffer.position(current.nextKvOffset);
|
||||||
|
previous.invalidate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareKey(KVComparator comparator, byte[] key, int offset,
|
||||||
|
int length) {
|
||||||
|
return comparator.compareFlatKey(key, offset, length,
|
||||||
|
current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
|
||||||
|
current.keyBuffer.getLength());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareKey(KVComparator comparator, Cell key) {
|
||||||
|
return comparator.compareOnlyKeyPortion(key, new KeyValue.KeyOnlyKeyValue(
|
||||||
|
current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
|
||||||
|
current.keyBuffer.getLength()));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void decodeFirst() {
|
||||||
|
decodeNext();
|
||||||
|
previous.invalidate();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void decodeAtPosition(int position) {
|
||||||
|
currentBuffer.position(position);
|
||||||
|
decodeNext();
|
||||||
|
previous.invalidate();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void decodeNext() {
|
||||||
|
current.startOffset = currentBuffer.position();
|
||||||
|
int p = currentBuffer.position() + currentBuffer.arrayOffset();
|
||||||
|
long ll = Bytes.toLong(currentBuffer.array(), p);
|
||||||
|
// Read top half as an int of key length and bottom int as value length
|
||||||
|
current.keyLength = (int) (ll >> Integer.SIZE);
|
||||||
|
current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
|
||||||
|
ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_LONG);
|
||||||
|
// key part
|
||||||
|
current.keyBuffer.set(currentBuffer.array(), currentBuffer.arrayOffset()
|
||||||
|
+ currentBuffer.position(), current.keyLength);
|
||||||
|
ByteBufferUtils.skip(currentBuffer, current.keyLength);
|
||||||
|
// value part
|
||||||
|
current.valueOffset = currentBuffer.position();
|
||||||
|
ByteBufferUtils.skip(currentBuffer, current.valueLength);
|
||||||
|
if (includesTags()) {
|
||||||
|
decodeTags();
|
||||||
|
}
|
||||||
|
if (includesMvcc()) {
|
||||||
|
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
|
||||||
|
} else {
|
||||||
|
current.memstoreTS = 0;
|
||||||
|
}
|
||||||
|
current.nextKvOffset = currentBuffer.position();
|
||||||
|
current.setKey(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
|
||||||
|
current.keyBuffer.getLength());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean includesMvcc() {
|
||||||
|
return this.decodingCtx.getHFileContext().isIncludesMvcc();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean includesTags() {
|
||||||
|
return this.decodingCtx.getHFileContext().isIncludesTags();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void decodeTags() {
|
||||||
|
current.tagsLength = currentBuffer.getShort();
|
||||||
|
current.tagsOffset = currentBuffer.position();
|
||||||
|
ByteBufferUtils.skip(currentBuffer, current.tagsLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected class SeekerState {
|
||||||
|
/**
|
||||||
|
* The size of a (key length, value length) tuple that prefixes each entry
|
||||||
|
* in a data block.
|
||||||
|
*/
|
||||||
|
public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
|
||||||
|
|
||||||
|
protected ByteBuffer currentBuffer;
|
||||||
|
protected int startOffset = -1;
|
||||||
|
protected int valueOffset = -1;
|
||||||
|
protected int keyLength;
|
||||||
|
protected int valueLength;
|
||||||
|
protected int tagsLength = 0;
|
||||||
|
protected int tagsOffset = -1;
|
||||||
|
|
||||||
|
protected SimpleMutableByteRange keyBuffer = new SimpleMutableByteRange();
|
||||||
|
protected long memstoreTS;
|
||||||
|
protected int nextKvOffset;
|
||||||
|
protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
|
||||||
|
|
||||||
|
protected boolean isValid() {
|
||||||
|
return valueOffset != -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void invalidate() {
|
||||||
|
valueOffset = -1;
|
||||||
|
currentKey = new KeyValue.KeyOnlyKeyValue();
|
||||||
|
currentBuffer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setKey(byte[] key, int offset, int length) {
|
||||||
|
currentKey.setKey(key, offset, length);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getSequenceId() {
|
||||||
|
return memstoreTS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy the state from the next one into this instance (the previous state
|
||||||
|
* placeholder). Used to save the previous state when we are advancing the
|
||||||
|
* seeker to the next key/value.
|
||||||
|
*/
|
||||||
|
protected void copyFromNext(SeekerState nextState) {
|
||||||
|
keyBuffer.set(nextState.keyBuffer.getBytes(),
|
||||||
|
nextState.keyBuffer.getOffset(), nextState.keyBuffer.getLength());
|
||||||
|
currentKey.setKey(nextState.keyBuffer.getBytes(),
|
||||||
|
nextState.keyBuffer.getOffset(), nextState.keyBuffer.getLength());
|
||||||
|
|
||||||
|
startOffset = nextState.startOffset;
|
||||||
|
valueOffset = nextState.valueOffset;
|
||||||
|
keyLength = nextState.keyLength;
|
||||||
|
valueLength = nextState.valueLength;
|
||||||
|
nextKvOffset = nextState.nextKvOffset;
|
||||||
|
memstoreTS = nextState.memstoreTS;
|
||||||
|
currentBuffer = nextState.currentBuffer;
|
||||||
|
tagsOffset = nextState.tagsOffset;
|
||||||
|
tagsLength = nextState.tagsLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return CellUtil.getCellKeyAsString(toCell());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getCellBufSize() {
|
||||||
|
int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength;
|
||||||
|
if (includesTags()) {
|
||||||
|
kvBufSize += Bytes.SIZEOF_SHORT + tagsLength;
|
||||||
|
}
|
||||||
|
return kvBufSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Cell formNoTagsKeyValue() {
|
||||||
|
NoTagsKeyValue ret = new NoTagsKeyValue(currentBuffer.array(),
|
||||||
|
currentBuffer.arrayOffset() + startOffset, getCellBufSize());
|
||||||
|
if (includesMvcc()) {
|
||||||
|
ret.setSequenceId(memstoreTS);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Cell toCell() {
|
||||||
|
if (tagsOffset > 0) {
|
||||||
|
KeyValue ret = new KeyValue(currentBuffer.array(),
|
||||||
|
currentBuffer.arrayOffset() + startOffset, getCellBufSize());
|
||||||
|
if (includesMvcc()) {
|
||||||
|
ret.setSequenceId(memstoreTS);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
} else {
|
||||||
|
return formNoTagsKeyValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -83,7 +83,7 @@ public class TestSeekToBlockWithEncoders {
|
||||||
KeyValue kv4 = new KeyValue(Bytes.toBytes("aad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
KeyValue kv4 = new KeyValue(Bytes.toBytes("aad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||||
Bytes.toBytes("val"));
|
Bytes.toBytes("val"));
|
||||||
sampleKv.add(kv4);
|
sampleKv.add(kv4);
|
||||||
KeyValue kv5 = new KeyValue(Bytes.toBytes("aaaad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
KeyValue kv5 = new KeyValue(Bytes.toBytes("aaddd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||||
Bytes.toBytes("val"));
|
Bytes.toBytes("val"));
|
||||||
sampleKv.add(kv5);
|
sampleKv.add(kv5);
|
||||||
KeyValue toSeek = new KeyValue(Bytes.toBytes("aaaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
KeyValue toSeek = new KeyValue(Bytes.toBytes("aaaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||||
|
@ -106,7 +106,7 @@ public class TestSeekToBlockWithEncoders {
|
||||||
KeyValue kv3 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
KeyValue kv3 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||||
Bytes.toBytes("val"));
|
Bytes.toBytes("val"));
|
||||||
sampleKv.add(kv3);
|
sampleKv.add(kv3);
|
||||||
KeyValue kv4 = new KeyValue(Bytes.toBytes("aaae"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
KeyValue kv4 = new KeyValue(Bytes.toBytes("aade"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||||
Bytes.toBytes("val"));
|
Bytes.toBytes("val"));
|
||||||
sampleKv.add(kv4);
|
sampleKv.add(kv4);
|
||||||
KeyValue kv5 = new KeyValue(Bytes.toBytes("bbbcd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
KeyValue kv5 = new KeyValue(Bytes.toBytes("bbbcd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
|
||||||
|
|
Loading…
Reference in New Issue