From c899897bc8dc4a7eccc9e2a80fd05ad55654f18e Mon Sep 17 00:00:00 2001 From: anoopsamjohn Date: Mon, 29 Aug 2016 12:11:46 +0530 Subject: [PATCH] HBASE-16213 A new HFileBlock structure for fast random get. (binlijin) --- .../hbase/io/ByteArrayOutputStream.java | 127 ++++++ .../hbase/io/encoding/DataBlockEncoding.java | 3 +- .../hbase/io/encoding/RowIndexCodecV1.java | 165 +++++++ .../hbase/io/encoding/RowIndexEncoderV1.java | 115 +++++ .../hbase/io/encoding/RowIndexSeekerV1.java | 431 ++++++++++++++++++ .../encoding/TestSeekToBlockWithEncoders.java | 4 +- 6 files changed, 842 insertions(+), 3 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java new file mode 100644 index 00000000000..a3c571fe93c --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.BufferOverflowException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Our own implementation of ByteArrayOutputStream where all methods are NOT + * synchronized and supports writing ByteBuffer directly to it. + */ +@InterfaceAudience.Private +public class ByteArrayOutputStream extends OutputStream { + + // Borrowed from openJDK: + // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + private byte[] buf; + private int pos = 0; + + public ByteArrayOutputStream() { + this(32); + } + + public ByteArrayOutputStream(int capacity) { + this.buf = new byte[capacity]; + } + + /** + * Writes an int to the underlying output stream as four + * bytes, high byte first. + * @param i the int to write + * @throws IOException if an I/O error occurs. + */ + public void writeInt(int i) throws IOException { + checkSizeAndGrow(Bytes.SIZEOF_INT); + Bytes.putInt(this.buf, this.pos, i); + this.pos += Bytes.SIZEOF_INT; + } + + @Override + public void write(int b) throws IOException { + checkSizeAndGrow(Bytes.SIZEOF_BYTE); + buf[this.pos] = (byte) b; + this.pos++; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkSizeAndGrow(len); + System.arraycopy(b, off, this.buf, this.pos, len); + this.pos += len; + } + + private void checkSizeAndGrow(int extra) { + long capacityNeeded = this.pos + (long) extra; + if (capacityNeeded > this.buf.length) { + // guarantee it's possible to fit + if (capacityNeeded > MAX_ARRAY_SIZE) { + throw new BufferOverflowException(); + } + // double until hit the cap + long nextCapacity = Math.min(this.buf.length << 1, MAX_ARRAY_SIZE); + // but make sure there is enough if twice the existing capacity is still + // too small + nextCapacity = Math.max(nextCapacity, capacityNeeded); + if (nextCapacity > MAX_ARRAY_SIZE) { + throw new BufferOverflowException(); + } + byte[] newBuf = new byte[(int) nextCapacity]; + System.arraycopy(buf, 0, newBuf, 0, buf.length); + buf = newBuf; + } + } + + /** + * Resets the pos field of this byte array output stream to zero. + * The output stream can be used again. + */ + public void reset() { + this.pos = 0; + } + + /** + * Copies the content of this Stream into a new byte array. + * + * @return the contents of this output stream, as new byte array. + */ + public byte toByteArray()[] { + return Arrays.copyOf(buf, pos); + } + + /** + * @return the underlying array where the data gets accumulated + */ + public byte[] getBuffer() { + return this.buf; + } + + /** + * @return The current size of the buffer. + */ + public int size() { + return this.pos; + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java index 67d18ed7712..71b55e2db18 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java @@ -43,7 +43,8 @@ public enum DataBlockEncoding { FAST_DIFF(4, "org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder"), // id 5 is reserved for the COPY_KEY algorithm for benchmarking // COPY_KEY(5, "org.apache.hadoop.hbase.io.encoding.CopyKeyDataBlockEncoder"), - PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"); + PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"), + ROW_INDEX_V1(7, "org.apache.hadoop.hbase.io.encoding.RowIndexCodecV1"); private final short id; private final byte[] idInBytes; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java new file mode 100644 index 00000000000..f18e09467f6 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Store cells following every row's start offset, so we can binary search to a row's cells. + * + * Format: + * flat cells + * integer: number of rows + * integer: row0's offset + * integer: row1's offset + * .... + * integer: dataSize + * +*/ +@InterfaceAudience.Private +public class RowIndexCodecV1 implements DataBlockEncoder { + + private static class RowIndexEncodingState extends EncodingState { + RowIndexEncoderV1 encoder = null; + } + + @Override + public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, + DataOutputStream out) throws IOException { + if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) { + throw new IOException(this.getClass().getName() + " only accepts " + + HFileBlockDefaultEncodingContext.class.getName() + " as the " + + "encoding context."); + } + + HFileBlockDefaultEncodingContext encodingCtx = (HFileBlockDefaultEncodingContext) blkEncodingCtx; + encodingCtx.prepareEncoding(out); + + RowIndexEncoderV1 encoder = new RowIndexEncoderV1(out, encodingCtx); + RowIndexEncodingState state = new RowIndexEncodingState(); + state.encoder = encoder; + blkEncodingCtx.setEncodingState(state); + } + + @Override + public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, + DataOutputStream out) throws IOException { + RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx + .getEncodingState(); + RowIndexEncoderV1 encoder = state.encoder; + return encoder.write(cell); + } + + @Override + public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, + DataOutputStream out, byte[] uncompressedBytesWithHeader) + throws IOException { + RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx + .getEncodingState(); + RowIndexEncoderV1 encoder = state.encoder; + encoder.flush(); + if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) { + encodingCtx.postEncoding(BlockType.ENCODED_DATA); + } else { + encodingCtx.postEncoding(BlockType.DATA); + } + } + + @Override + public ByteBuffer decodeKeyValues(DataInputStream source, + HFileBlockDecodingContext decodingCtx) throws IOException { + if (!decodingCtx.getHFileContext().isIncludesTags()) { + ByteBuffer sourceAsBuffer = ByteBufferUtils + .drainInputStreamToBuffer(source);// waste + sourceAsBuffer.mark(); + sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT); + int onDiskSize = sourceAsBuffer.getInt(); + sourceAsBuffer.reset(); + ByteBuffer dup = sourceAsBuffer.duplicate(); + dup.position(sourceAsBuffer.position()); + dup.limit(sourceAsBuffer.position() + onDiskSize); + return dup.slice(); + } else { + ByteBuffer sourceAsBuffer = ByteBufferUtils + .drainInputStreamToBuffer(source);// waste + sourceAsBuffer.mark(); + RowIndexSeekerV1 seeker = new RowIndexSeekerV1(KeyValue.COMPARATOR, + decodingCtx); + seeker.setCurrentBuffer(sourceAsBuffer); + List kvs = new ArrayList(); + kvs.add(seeker.getKeyValueBuffer()); + while (seeker.next()) { + kvs.add(seeker.getKeyValueBuffer()); + } + int totalLength = 0; + for (ByteBuffer buf : kvs) { + totalLength += buf.remaining(); + } + byte[] keyValueBytes = new byte[totalLength]; + ByteBuffer result = ByteBuffer.wrap(keyValueBytes); + for (ByteBuffer buf : kvs) { + result.put(buf); + } + return result; + } + } + + @Override + public ByteBuffer getFirstKeyInBlock(ByteBuffer block) { + block.mark(); + int keyLength = block.getInt(); + block.getInt(); + int pos = block.position(); + block.reset(); + ByteBuffer dup = block.duplicate(); + dup.position(pos); + dup.limit(pos + keyLength); + return dup.slice(); + } + + @Override + public EncodedSeeker createSeeker(KVComparator comparator, + HFileBlockDecodingContext decodingCtx) { + return new RowIndexSeekerV1(comparator, decodingCtx); + } + + @Override + public HFileBlockEncodingContext newDataBlockEncodingContext( + DataBlockEncoding encoding, byte[] header, HFileContext meta) { + return new HFileBlockDefaultEncodingContext(encoding, header, meta); + } + + @Override + public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) { + return new HFileBlockDefaultDecodingContext(meta); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java new file mode 100644 index 00000000000..888ef9e95f1 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexEncoderV1 { + private static final Log LOG = LogFactory.getLog(RowIndexEncoderV1.class); + + /** The Cell previously appended. */ + private Cell lastCell = null; + + private DataOutputStream out; + private HFileBlockDefaultEncodingContext encodingCtx; + private int startOffset = -1; + private ByteArrayOutputStream rowsOffsetBAOS = new ByteArrayOutputStream( + 64 * 4); + + public RowIndexEncoderV1(DataOutputStream out, + HFileBlockDefaultEncodingContext encodingCtx) { + this.out = out; + this.encodingCtx = encodingCtx; + } + + public int write(Cell cell) throws IOException { + // checkKey uses comparator to check we are writing in order. + if (!checkRow(cell)) { + if (startOffset < 0) { + startOffset = out.size(); + } + rowsOffsetBAOS.writeInt(out.size() - startOffset); + } + int klength = KeyValueUtil.keyLength(cell); + int vlength = cell.getValueLength(); + out.writeInt(klength); + out.writeInt(vlength); + CellUtil.writeFlatKey(cell, out); + // Write the value part + out.write(cell.getValueArray(), cell.getValueOffset(), vlength); + int encodedKvSize = klength + vlength + + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; + // Write the additional tag into the stream + if (encodingCtx.getHFileContext().isIncludesTags()) { + int tagsLength = cell.getTagsLength(); + out.writeShort(tagsLength); + // There are some tags to be written + if (tagsLength > 0) { + out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + } + encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE; + } + if (encodingCtx.getHFileContext().isIncludesMvcc()) { + WritableUtils.writeVLong(out, cell.getSequenceId()); + encodedKvSize += WritableUtils.getVIntSize(cell.getSequenceId()); + } + lastCell = cell; + return encodedKvSize; + } + + protected boolean checkRow(final Cell cell) throws IOException { + boolean isDuplicateRow = false; + if (cell == null) { + throw new IOException("Key cannot be null or empty"); + } + if (lastCell != null) { + int keyComp = KeyValue.COMPARATOR.compareRows(lastCell, cell); + if (keyComp > 0) { + throw new IOException("Added a key not lexically larger than" + + " previous. Current cell = " + cell + ", lastCell = " + lastCell); + } else if (keyComp == 0) { + isDuplicateRow = true; + } + } + return isDuplicateRow; + } + + public void flush() throws IOException { + int onDiskDataSize = 0; + if (startOffset >= 0) { + onDiskDataSize = out.size() - startOffset; + } + // rowsOffsetBAOS.size() / 4 + out.writeInt(rowsOffsetBAOS.size() >> 2); + if (rowsOffsetBAOS.size() > 0) { + out.write(rowsOffsetBAOS.getBuffer(), 0, rowsOffsetBAOS.size()); + } + out.writeInt(onDiskDataSize); + if (LOG.isTraceEnabled()) { + LOG.trace("RowNumber: " + (rowsOffsetBAOS.size() >> 2) + + ", onDiskDataSize: " + onDiskDataSize + ", totalOnDiskSize: " + + (out.size() - startOffset)); + } + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java new file mode 100644 index 00000000000..a3289d69782 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.NoTagsKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.SimpleMutableByteRange; +import org.apache.hadoop.io.WritableUtils; + +@InterfaceAudience.Private +public class RowIndexSeekerV1 implements EncodedSeeker { + + private HFileBlockDecodingContext decodingCtx; + private final KVComparator comparator; + + private ByteBuffer currentBuffer; + private SeekerState current = new SeekerState(); // always valid + private SeekerState previous = new SeekerState(); // may not be valid + + private int rowNumber; + private ByteBuffer rowOffsets = null; + + public RowIndexSeekerV1(KVComparator comparator, + HFileBlockDecodingContext decodingCtx) { + this.comparator = comparator; + this.decodingCtx = decodingCtx; + } + + @Override + public void setCurrentBuffer(ByteBuffer buffer) { + int onDiskSize = Bytes.toIntUnsafe(buffer.array(), buffer.arrayOffset() + + buffer.limit() - Bytes.SIZEOF_INT); + // int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT); + + // Data part + ByteBuffer dup = buffer.duplicate(); + dup.position(buffer.position()); + dup.limit(buffer.position() + onDiskSize); + currentBuffer = dup.slice(); + current.currentBuffer = currentBuffer; + ByteBufferUtils.skip(buffer, onDiskSize); + + // Row offset + rowNumber = buffer.getInt(); + // equals Bytes.SIZEOF_INT * rowNumber + int totalRowOffsetsLength = rowNumber << 2; + ByteBuffer rowDup = buffer.duplicate(); + rowDup.position(buffer.position()); + rowDup.limit(buffer.position() + totalRowOffsetsLength); + rowOffsets = rowDup.slice(); + + decodeFirst(); + } + + @Override + public ByteBuffer getKeyDeepCopy() { + ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength); + keyBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), + current.keyLength); + keyBuffer.rewind(); + return keyBuffer; + } + + @Override + public ByteBuffer getValueShallowCopy() { + ByteBuffer dup = currentBuffer.duplicate(); + dup.position(current.valueOffset); + dup.limit(current.valueOffset + current.valueLength); + return dup.slice(); + } + + ByteBuffer getKeyValueBuffer() { + ByteBuffer kvBuffer = createKVBuffer(); + kvBuffer.putInt(current.keyLength); + kvBuffer.putInt(current.valueLength); + kvBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), + current.keyLength); + ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, + current.valueOffset, current.valueLength); + if (current.tagsLength > 0) { + // Put short as unsigned + kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff)); + kvBuffer.put((byte) (current.tagsLength & 0xff)); + if (current.tagsOffset != -1) { + ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, + current.tagsOffset, current.tagsLength); + } + } + if (includesMvcc()) { + ByteBufferUtils.writeVLong(kvBuffer, current.getSequenceId()); + } + kvBuffer.rewind(); + return kvBuffer; + } + + protected ByteBuffer createKVBuffer() { + int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize( + current.keyLength, current.valueLength, current.tagsLength); + if (includesMvcc()) { + kvBufSize += WritableUtils.getVIntSize(current.getSequenceId()); + } + ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize); + return kvBuffer; + } + + @Override + public Cell getKeyValue() { + return current.toCell(); + } + + @Override + public void rewind() { + currentBuffer.rewind(); + decodeFirst(); + } + + @Override + public boolean next() { + if (!currentBuffer.hasRemaining()) { + return false; + } + decodeNext(); + previous.invalidate(); + return true; + } + + @Override + public int seekToKeyInBlock(byte[] key, int offset, int length, + boolean seekBefore) { + return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length), + seekBefore); + } + + private int binarySearch(Cell seekCell, boolean seekBefore) { + int low = 0; + int high = rowNumber - 1; + int mid = (low + high) >>> 1; + int comp = 0; + SimpleMutableByteRange row = new SimpleMutableByteRange(); + while (low <= high) { + mid = (low + high) >>> 1; + getRow(mid, row); + comp = comparator.compareRows(row.getBytes(), row.getOffset(), + row.getLength(), seekCell.getRowArray(), seekCell.getRowOffset(), + seekCell.getRowLength()); + if (comp < 0) { + low = mid + 1; + } else if (comp > 0) { + high = mid - 1; + } else { + // key found + if (seekBefore) { + return mid - 1; + } else { + return mid; + } + } + } + // key not found. + if (comp > 0) { + return mid - 1; + } else { + return mid; + } + } + + private void getRow(int index, SimpleMutableByteRange row) { + int offset = Bytes.toIntUnsafe(rowOffsets.array(), rowOffsets.arrayOffset() + + (index << 2)); // index * Bytes.SIZEOF_INT + int position = currentBuffer.arrayOffset() + offset + Bytes.SIZEOF_LONG; + short rowLen = Bytes.toShortUnsafe(currentBuffer.array(), position); + row.set(currentBuffer.array(), position + Bytes.SIZEOF_SHORT, rowLen); + } + + @Override + public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) { + previous.invalidate(); + int index = binarySearch(seekCell, seekBefore); + if (index < 0) { + return HConstants.INDEX_KEY_MAGIC; // using optimized index key + } else { + int offset = Bytes.toIntUnsafe(rowOffsets.array(), + rowOffsets.arrayOffset() + (index << 2)); + if (offset != 0) { + decodeAtPosition(offset); + } + } + do { + int comp; + comp = comparator.compareOnlyKeyPortion(seekCell, current.currentKey); + if (comp == 0) { // exact match + if (seekBefore) { + if (!previous.isValid()) { + // The caller (seekBefore) has to ensure that we are not at the + // first key in the block. + throw new IllegalStateException("Cannot seekBefore if " + + "positioned at the first key in the block: key=" + + Bytes.toStringBinary(seekCell.getRowArray())); + } + moveToPrevious(); + return 1; + } + return 0; + } + + if (comp < 0) { // already too large, check previous + if (previous.isValid()) { + moveToPrevious(); + } else { + return HConstants.INDEX_KEY_MAGIC; // using optimized index key + } + return 1; + } + + // move to next, if more data is available + if (currentBuffer.hasRemaining()) { + previous.copyFromNext(current); + decodeNext(); + } else { + break; + } + } while (true); + + // we hit the end of the block, not an exact match + return 1; + } + + private void moveToPrevious() { + if (!previous.isValid()) { + throw new IllegalStateException( + "Can move back only once and not in first key in the block."); + } + + SeekerState tmp = previous; + previous = current; + current = tmp; + + // move after last key value + currentBuffer.position(current.nextKvOffset); + previous.invalidate(); + } + + @Override + public int compareKey(KVComparator comparator, byte[] key, int offset, + int length) { + return comparator.compareFlatKey(key, offset, length, + current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), + current.keyBuffer.getLength()); + } + + @Override + public int compareKey(KVComparator comparator, Cell key) { + return comparator.compareOnlyKeyPortion(key, new KeyValue.KeyOnlyKeyValue( + current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), + current.keyBuffer.getLength())); + } + + protected void decodeFirst() { + decodeNext(); + previous.invalidate(); + } + + protected void decodeAtPosition(int position) { + currentBuffer.position(position); + decodeNext(); + previous.invalidate(); + } + + protected void decodeNext() { + current.startOffset = currentBuffer.position(); + int p = currentBuffer.position() + currentBuffer.arrayOffset(); + long ll = Bytes.toLong(currentBuffer.array(), p); + // Read top half as an int of key length and bottom int as value length + current.keyLength = (int) (ll >> Integer.SIZE); + current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll); + ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_LONG); + // key part + current.keyBuffer.set(currentBuffer.array(), currentBuffer.arrayOffset() + + currentBuffer.position(), current.keyLength); + ByteBufferUtils.skip(currentBuffer, current.keyLength); + // value part + current.valueOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.valueLength); + if (includesTags()) { + decodeTags(); + } + if (includesMvcc()) { + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + } else { + current.memstoreTS = 0; + } + current.nextKvOffset = currentBuffer.position(); + current.setKey(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(), + current.keyBuffer.getLength()); + } + + protected boolean includesMvcc() { + return this.decodingCtx.getHFileContext().isIncludesMvcc(); + } + + protected boolean includesTags() { + return this.decodingCtx.getHFileContext().isIncludesTags(); + } + + protected void decodeTags() { + current.tagsLength = currentBuffer.getShort(); + current.tagsOffset = currentBuffer.position(); + ByteBufferUtils.skip(currentBuffer, current.tagsLength); + } + + protected class SeekerState { + /** + * The size of a (key length, value length) tuple that prefixes each entry + * in a data block. + */ + public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + + protected ByteBuffer currentBuffer; + protected int startOffset = -1; + protected int valueOffset = -1; + protected int keyLength; + protected int valueLength; + protected int tagsLength = 0; + protected int tagsOffset = -1; + + protected SimpleMutableByteRange keyBuffer = new SimpleMutableByteRange(); + protected long memstoreTS; + protected int nextKvOffset; + protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); + + protected boolean isValid() { + return valueOffset != -1; + } + + protected void invalidate() { + valueOffset = -1; + currentKey = new KeyValue.KeyOnlyKeyValue(); + currentBuffer = null; + } + + protected void setKey(byte[] key, int offset, int length) { + currentKey.setKey(key, offset, length); + } + + protected long getSequenceId() { + return memstoreTS; + } + + /** + * Copy the state from the next one into this instance (the previous state + * placeholder). Used to save the previous state when we are advancing the + * seeker to the next key/value. + */ + protected void copyFromNext(SeekerState nextState) { + keyBuffer.set(nextState.keyBuffer.getBytes(), + nextState.keyBuffer.getOffset(), nextState.keyBuffer.getLength()); + currentKey.setKey(nextState.keyBuffer.getBytes(), + nextState.keyBuffer.getOffset(), nextState.keyBuffer.getLength()); + + startOffset = nextState.startOffset; + valueOffset = nextState.valueOffset; + keyLength = nextState.keyLength; + valueLength = nextState.valueLength; + nextKvOffset = nextState.nextKvOffset; + memstoreTS = nextState.memstoreTS; + currentBuffer = nextState.currentBuffer; + tagsOffset = nextState.tagsOffset; + tagsLength = nextState.tagsLength; + } + + @Override + public String toString() { + return CellUtil.getCellKeyAsString(toCell()); + } + + protected int getCellBufSize() { + int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength; + if (includesTags()) { + kvBufSize += Bytes.SIZEOF_SHORT + tagsLength; + } + return kvBufSize; + } + + protected Cell formNoTagsKeyValue() { + NoTagsKeyValue ret = new NoTagsKeyValue(currentBuffer.array(), + currentBuffer.arrayOffset() + startOffset, getCellBufSize()); + if (includesMvcc()) { + ret.setSequenceId(memstoreTS); + } + return ret; + } + + public Cell toCell() { + if (tagsOffset > 0) { + KeyValue ret = new KeyValue(currentBuffer.array(), + currentBuffer.arrayOffset() + startOffset, getCellBufSize()); + if (includesMvcc()) { + ret.setSequenceId(memstoreTS); + } + return ret; + } else { + return formNoTagsKeyValue(); + } + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java index 914a37bbb76..5f8c1ef4af0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java @@ -83,7 +83,7 @@ public class TestSeekToBlockWithEncoders { KeyValue kv4 = new KeyValue(Bytes.toBytes("aad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("val")); sampleKv.add(kv4); - KeyValue kv5 = new KeyValue(Bytes.toBytes("aaaad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), + KeyValue kv5 = new KeyValue(Bytes.toBytes("aaddd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("val")); sampleKv.add(kv5); KeyValue toSeek = new KeyValue(Bytes.toBytes("aaaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), @@ -106,7 +106,7 @@ public class TestSeekToBlockWithEncoders { KeyValue kv3 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("val")); sampleKv.add(kv3); - KeyValue kv4 = new KeyValue(Bytes.toBytes("aaae"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), + KeyValue kv4 = new KeyValue(Bytes.toBytes("aade"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("val")); sampleKv.add(kv4); KeyValue kv5 = new KeyValue(Bytes.toBytes("bbbcd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),