HBASE-10835 DBE encode path improvements.(Anoop)

This commit is contained in:
anoopsjohn 2014-05-22 11:59:52 +05:30
parent cb1428ddca
commit 53513dcb45
25 changed files with 1035 additions and 1089 deletions

View File

@ -474,10 +474,18 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
abstract protected void decodeNext();
}
protected final void afterEncodingKeyValue(ByteBuffer in,
DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
/**
* @param kv
* @param out
* @param encodingCtx
* @return unencoded size added
* @throws IOException
*/
protected final int afterEncodingKeyValue(KeyValue kv, DataOutputStream out,
HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
int size = 0;
if (encodingCtx.getHFileContext().isIncludesTags()) {
short tagsLength = in.getShort();
short tagsLength = kv.getTagsLength();
ByteBufferUtils.putCompressedInt(out, tagsLength);
// There are some tags to be written
if (tagsLength > 0) {
@ -485,23 +493,23 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
// When tag compression is enabled, tagCompressionContext will have a not null value. Write
// the tags using Dictionary compression in such a case
if (tagCompressionContext != null) {
tagCompressionContext.compressTags(out, in, tagsLength);
tagCompressionContext
.compressTags(out, kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
} else {
ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
}
}
size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
}
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
// Copy memstore timestamp from the byte buffer to the output stream.
long memstoreTS = -1;
try {
memstoreTS = ByteBufferUtils.readVLong(in);
WritableUtils.writeVLong(out, memstoreTS);
} catch (IOException ex) {
throw new RuntimeException("Unable to copy memstore timestamp " +
memstoreTS + " after encoding a key/value");
}
long memstoreTS = kv.getMvccVersion();
WritableUtils.writeVLong(out, memstoreTS);
// TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
// avoided.
size += WritableUtils.getVIntSize(memstoreTS);
}
return size;
}
protected final void afterDecodingKeyValue(DataInputStream source,
@ -545,57 +553,10 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
return new HFileBlockDefaultDecodingContext(meta);
}
/**
* Compress KeyValues and write them to output buffer.
* @param out Where to write compressed data.
* @param in Source of KeyValue for compression.
* @param encodingCtx use the Encoding ctx associated with the current block
* @throws IOException If there is an error writing to output stream.
*/
public abstract void internalEncodeKeyValues(DataOutputStream out,
ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException;
protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
throws IOException;
@Override
public void encodeKeyValues(ByteBuffer in,
HFileBlockEncodingContext blkEncodingCtx) throws IOException {
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
throw new IOException (this.getClass().getName() + " only accepts "
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " +
"encoding context.");
}
HFileBlockDefaultEncodingContext encodingCtx =
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding();
DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
if (encodingCtx.getHFileContext().isIncludesTags()
&& encodingCtx.getHFileContext().isCompressTags()) {
if (encodingCtx.getTagCompressionContext() != null) {
// It will be overhead to create the TagCompressionContext again and again for every block
// encoding.
encodingCtx.getTagCompressionContext().clear();
} else {
try {
TagCompressionContext tagCompressionContext = new TagCompressionContext(
LRUDictionary.class, Byte.MAX_VALUE);
encodingCtx.setTagCompressionContext(tagCompressionContext);
} catch (Exception e) {
throw new IOException("Failed to initialize TagCompressionContext", e);
}
}
}
internalEncodeKeyValues(dataOut, in, encodingCtx);
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
} else {
encodingCtx.postEncoding(BlockType.DATA);
}
}
/**
* Asserts that there is at least the given amount of unfilled space
* remaining in the given buffer.
@ -613,4 +574,68 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
}
@Override
public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
throws IOException {
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
throw new IOException (this.getClass().getName() + " only accepts "
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " +
"encoding context.");
}
HFileBlockDefaultEncodingContext encodingCtx =
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding(out);
if (encodingCtx.getHFileContext().isIncludesTags()
&& encodingCtx.getHFileContext().isCompressTags()) {
if (encodingCtx.getTagCompressionContext() != null) {
// It will be overhead to create the TagCompressionContext again and again for every block
// encoding.
encodingCtx.getTagCompressionContext().clear();
} else {
try {
TagCompressionContext tagCompressionContext = new TagCompressionContext(
LRUDictionary.class, Byte.MAX_VALUE);
encodingCtx.setTagCompressionContext(tagCompressionContext);
} catch (Exception e) {
throw new IOException("Failed to initialize TagCompressionContext", e);
}
}
}
ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
}
private static class BufferedDataBlockEncodingState extends EncodingState {
int unencodedDataSizeWritten = 0;
}
@Override
public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException {
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
.getEncodingState();
int encodedKvSize = internalEncode(kv, (HFileBlockDefaultEncodingContext) encodingCtx, out);
state.unencodedDataSizeWritten += encodedKvSize;
return encodedKvSize;
}
public abstract int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingCtx,
DataOutputStream out) throws IOException;
@Override
public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader) throws IOException {
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
.getEncodingState();
// Write the unencodedDataSizeWritten (with header size)
Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
+ DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
);
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
} else {
encodingCtx.postEncoding(BlockType.DATA);
}
}
}

View File

@ -22,9 +22,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
/**
* Just copy data, do not do any kind of compression. Use for comparison and
@ -32,14 +34,33 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
@InterfaceAudience.Private
public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
@Override
public void internalEncodeKeyValues(DataOutputStream out,
ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
in.rewind();
ByteBufferUtils.putInt(out, in.limit());
ByteBufferUtils.moveBufferToStream(out, in, in.limit());
}
@Override
public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
DataOutputStream out) throws IOException {
int klength = kv.getKeyLength();
int vlength = kv.getValueLength();
out.writeInt(klength);
out.writeInt(vlength);
out.write(kv.getBuffer(), kv.getKeyOffset(), klength);
out.write(kv.getValueArray(), kv.getValueOffset(), vlength);
int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
// Write the additional tag into the stream
if (encodingContext.getHFileContext().isIncludesTags()) {
short tagsLength = kv.getTagsLength();
out.writeShort(tagsLength);
if (tagsLength > 0) {
out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
}
size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
}
if (encodingContext.getHFileContext().isIncludesMvcc()) {
WritableUtils.writeVLong(out, kv.getMvccVersion());
size += WritableUtils.getVIntSize(kv.getMvccVersion());
}
return size;
}
@Override
public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {

View File

@ -17,11 +17,13 @@
package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@ -34,28 +36,42 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
* <li>knowledge of Key Value format</li>
* </ul>
* It is designed to work fast enough to be feasible as in memory compression.
*
* After encoding, it also optionally compresses the encoded data if a
* compression algorithm is specified in HFileBlockEncodingContext argument of
* {@link #encodeKeyValues(ByteBuffer, HFileBlockEncodingContext)}.
*/
@InterfaceAudience.Private
public interface DataBlockEncoder {
/**
* Encodes KeyValues. It will first encode key value pairs, and then
* optionally do the compression for the encoded data.
*
* @param in
* Source of KeyValue for compression.
* Starts encoding for a block of KeyValues. Call
* {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[])} to finish
* encoding of a block.
* @param encodingCtx
* the encoding context which will contain encoded uncompressed bytes
* as well as compressed encoded bytes if compression is enabled, and
* also it will reuse resources across multiple calls.
* @param out
* @throws IOException
* If there is an error writing to output stream.
*/
void encodeKeyValues(ByteBuffer in, HFileBlockEncodingContext encodingCtx) throws IOException;
void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException;
/**
* Encodes a KeyValue.
* @param kv
* @param encodingCtx
* @param out
* @return unencoded kv size written
* @throws IOException
*/
int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException;
/**
* Ends encoding for a block of KeyValues. Gives a chance for the encoder to do the finishing
* stuff for the encoded block. It must be called at the end of block encoding.
* @param encodingCtx
* @param out
* @param uncompressedBytesWithHeader
* @throws IOException
*/
void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader) throws IOException;
/**
* Decode.

View File

@ -75,130 +75,6 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
}
private void compressSingleKeyValue(DiffCompressionState previousState,
DiffCompressionState currentState, DataOutputStream out,
ByteBuffer in) throws IOException {
byte flag = 0;
int kvPos = in.position();
int keyLength = in.getInt();
int valueLength = in.getInt();
long timestamp;
long diffTimestamp = 0;
int diffTimestampFitsInBytes = 0;
int commonPrefix;
int timestampFitsInBytes;
if (previousState.isFirst()) {
currentState.readKey(in, keyLength, valueLength);
currentState.prevOffset = kvPos;
timestamp = currentState.timestamp;
if (timestamp < 0) {
flag |= FLAG_TIMESTAMP_SIGN;
timestamp = -timestamp;
}
timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
commonPrefix = 0;
// put column family
in.mark();
ByteBufferUtils.skip(in, currentState.rowLength
+ KeyValue.ROW_LENGTH_SIZE);
ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength
+ KeyValue.FAMILY_LENGTH_SIZE);
in.reset();
} else {
// find a common prefix and skip it
commonPrefix =
ByteBufferUtils.findCommonPrefix(in, in.position(),
previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
- KeyValue.TIMESTAMP_TYPE_SIZE);
// don't compress timestamp and type using prefix
currentState.readKey(in, keyLength, valueLength,
commonPrefix, previousState);
currentState.prevOffset = kvPos;
timestamp = currentState.timestamp;
boolean negativeTimestamp = timestamp < 0;
if (negativeTimestamp) {
timestamp = -timestamp;
}
timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
if (keyLength == previousState.keyLength) {
flag |= FLAG_SAME_KEY_LENGTH;
}
if (valueLength == previousState.valueLength) {
flag |= FLAG_SAME_VALUE_LENGTH;
}
if (currentState.type == previousState.type) {
flag |= FLAG_SAME_TYPE;
}
// encode timestamp
diffTimestamp = previousState.timestamp - currentState.timestamp;
boolean minusDiffTimestamp = diffTimestamp < 0;
if (minusDiffTimestamp) {
diffTimestamp = -diffTimestamp;
}
diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
if (diffTimestampFitsInBytes < timestampFitsInBytes) {
flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
flag |= FLAG_TIMESTAMP_IS_DIFF;
if (minusDiffTimestamp) {
flag |= FLAG_TIMESTAMP_SIGN;
}
} else {
flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
if (negativeTimestamp) {
flag |= FLAG_TIMESTAMP_SIGN;
}
}
}
out.write(flag);
if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, keyLength);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, valueLength);
}
ByteBufferUtils.putCompressedInt(out, commonPrefix);
ByteBufferUtils.skip(in, commonPrefix);
if (previousState.isFirst() ||
commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
int restRowLength =
currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
ByteBufferUtils.moveBufferToStream(out, in, restRowLength);
ByteBufferUtils.skip(in, currentState.familyLength +
KeyValue.FAMILY_LENGTH_SIZE);
ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength);
} else {
ByteBufferUtils.moveBufferToStream(out, in,
keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
}
if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
} else {
ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
}
if ((flag & FLAG_SAME_TYPE) == 0) {
out.write(currentState.type);
}
ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
ByteBufferUtils.moveBufferToStream(out, in, valueLength);
}
private void uncompressSingleKeyValue(DataInputStream source,
ByteBuffer buffer,
DiffCompressionState state)
@ -316,24 +192,110 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
@Override
public void internalEncodeKeyValues(DataOutputStream out,
ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
in.rewind();
ByteBufferUtils.putInt(out, in.limit());
DiffCompressionState previousState = new DiffCompressionState();
DiffCompressionState currentState = new DiffCompressionState();
while (in.hasRemaining()) {
compressSingleKeyValue(previousState, currentState,
out, in);
afterEncodingKeyValue(in, out, encodingCtx);
// swap previousState <-> currentState
DiffCompressionState tmp = previousState;
previousState = currentState;
currentState = tmp;
}
public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
DataOutputStream out) throws IOException {
EncodingState state = encodingContext.getEncodingState();
int size = compressSingleKeyValue(out, kv, state.prevKv);
size += afterEncodingKeyValue(kv, out, encodingContext);
state.prevKv = kv;
return size;
}
private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
throws IOException {
byte flag = 0;
int kLength = kv.getKeyLength();
int vLength = kv.getValueLength();
long timestamp;
long diffTimestamp = 0;
int diffTimestampFitsInBytes = 0;
int timestampFitsInBytes;
int commonPrefix;
byte[] curKvBuf = kv.getBuffer();
if (prevKv == null) {
timestamp = kv.getTimestamp();
if (timestamp < 0) {
flag |= FLAG_TIMESTAMP_SIGN;
timestamp = -timestamp;
}
timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
commonPrefix = 0;
// put column family
byte familyLength = kv.getFamilyLength();
out.write(familyLength);
out.write(kv.getFamilyArray(), kv.getFamilyOffset(), familyLength);
} else {
// Finding common prefix
int preKeyLength = prevKv.getKeyLength();
commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
- KeyValue.TIMESTAMP_TYPE_SIZE, prevKv.getBuffer(), prevKv.getKeyOffset(), preKeyLength
- KeyValue.TIMESTAMP_TYPE_SIZE);
if (kLength == preKeyLength) {
flag |= FLAG_SAME_KEY_LENGTH;
}
if (vLength == prevKv.getValueLength()) {
flag |= FLAG_SAME_VALUE_LENGTH;
}
if (kv.getTypeByte() == prevKv.getTypeByte()) {
flag |= FLAG_SAME_TYPE;
}
// don't compress timestamp and type using prefix encode timestamp
timestamp = kv.getTimestamp();
diffTimestamp = prevKv.getTimestamp() - timestamp;
boolean negativeTimestamp = timestamp < 0;
if (negativeTimestamp) {
timestamp = -timestamp;
}
timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
boolean minusDiffTimestamp = diffTimestamp < 0;
if (minusDiffTimestamp) {
diffTimestamp = -diffTimestamp;
}
diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
if (diffTimestampFitsInBytes < timestampFitsInBytes) {
flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
flag |= FLAG_TIMESTAMP_IS_DIFF;
if (minusDiffTimestamp) {
flag |= FLAG_TIMESTAMP_SIGN;
}
} else {
flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
if (negativeTimestamp) {
flag |= FLAG_TIMESTAMP_SIGN;
}
}
}
out.write(flag);
if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, kLength);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, vLength);
}
ByteBufferUtils.putCompressedInt(out, commonPrefix);
if (prevKv == null || commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
int restRowLength = kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restRowLength);
out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
} else {
out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kLength - commonPrefix
- KeyValue.TIMESTAMP_TYPE_SIZE);
}
if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
} else {
ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
}
if ((flag & FLAG_SAME_TYPE) == 0) {
out.write(kv.getTypeByte());
}
out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
}
@Override
public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hbase.io.encoding;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@ -36,13 +37,16 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.Compressor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Encapsulates a data block compressed using a particular encoding algorithm.
* Useful for testing and benchmarking.
* This is used only in testing.
*/
@InterfaceAudience.Private
@VisibleForTesting
public class EncodedDataBlock {
private byte[] rawKVs;
private ByteBuffer rawBuffer;
@ -215,16 +219,53 @@ public class EncodedDataBlock {
* @return encoded data block with header and checksum
*/
public byte[] encodeData() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
this.dataBlockEncoder.encodeKeyValues(
getUncompressedBuffer(), encodingCtx);
baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
DataOutputStream out = new DataOutputStream(baos);
this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
ByteBuffer in = getUncompressedBuffer();
in.rewind();
int klength, vlength;
short tagsLength = 0;
long memstoreTS = 0L;
KeyValue kv = null;
while (in.hasRemaining()) {
int kvOffset = in.position();
klength = in.getInt();
vlength = in.getInt();
ByteBufferUtils.skip(in, klength + vlength);
if (this.meta.isIncludesTags()) {
tagsLength = in.getShort();
ByteBufferUtils.skip(in, tagsLength);
}
if (this.meta.isIncludesMvcc()) {
memstoreTS = ByteBufferUtils.readVLong(in);
}
kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
klength, vlength, tagsLength));
kv.setMvccVersion(memstoreTS);
this.dataBlockEncoder.encode(kv, encodingCtx, out);
}
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
baos.writeTo(stream);
this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, stream.buf);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Bug in encoding part of algorithm %s. " +
"Probably it requested more bytes than are available.",
toString()), e);
}
return encodingCtx.getUncompressedBytesWithHeader();
return baos.toByteArray();
}
private static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
private byte[] buf;
@Override
public void write(byte[] b, int off, int len) {
this.buf = b;
}
}
@Override

View File

@ -0,0 +1,34 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.encoding;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
/**
* Keeps track of the encoding state.
*/
@InterfaceAudience.Private
public class EncodingState {
/**
* The previous KeyValue the encoder encoded.
*/
protected KeyValue prevKv = null;
}

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
@ -102,118 +101,14 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
}
private void compressSingleKeyValue(
FastDiffCompressionState previousState,
FastDiffCompressionState currentState,
OutputStream out, ByteBuffer in) throws IOException {
currentState.prevOffset = in.position();
int keyLength = in.getInt();
int valueOffset =
currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
int valueLength = in.getInt();
byte flag = 0;
if (previousState.isFirst()) {
// copy the key, there is no common prefix with none
out.write(flag);
ByteBufferUtils.putCompressedInt(out, keyLength);
ByteBufferUtils.putCompressedInt(out, valueLength);
ByteBufferUtils.putCompressedInt(out, 0);
currentState.readKey(in, keyLength, valueLength);
ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
} else {
// find a common prefix and skip it
int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
previousState.prevOffset + KeyValue.ROW_OFFSET,
Math.min(keyLength, previousState.keyLength) -
KeyValue.TIMESTAMP_TYPE_SIZE);
currentState.readKey(in, keyLength, valueLength,
commonPrefix, previousState);
if (keyLength == previousState.keyLength) {
flag |= FLAG_SAME_KEY_LENGTH;
}
if (valueLength == previousState.valueLength) {
flag |= FLAG_SAME_VALUE_LENGTH;
}
if (currentState.type == previousState.type) {
flag |= FLAG_SAME_TYPE;
}
int commonTimestampPrefix = findCommonTimestampPrefix(
currentState, previousState);
flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
// Check if current and previous values are the same. Compare value
// length first as an optimization.
if (valueLength == previousState.valueLength) {
int previousValueOffset = previousState.prevOffset
+ previousState.keyLength + KeyValue.ROW_OFFSET;
if (ByteBufferUtils.arePartsEqual(in,
previousValueOffset, previousState.valueLength,
valueOffset, valueLength)) {
flag |= FLAG_SAME_VALUE;
}
}
out.write(flag);
if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, keyLength);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, valueLength);
}
ByteBufferUtils.putCompressedInt(out, commonPrefix);
ByteBufferUtils.skip(in, commonPrefix);
if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
// Previous and current rows are different. Copy the differing part of
// the row, skip the column family, and copy the qualifier.
ByteBufferUtils.moveBufferToStream(out, in,
currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
ByteBufferUtils.skip(in, currentState.familyLength +
KeyValue.FAMILY_LENGTH_SIZE);
ByteBufferUtils.moveBufferToStream(out, in,
currentState.qualifierLength);
} else {
// The common part includes the whole row. As the column family is the
// same across the whole file, it will automatically be included in the
// common prefix, so we need not special-case it here.
int restKeyLength = keyLength - commonPrefix -
KeyValue.TIMESTAMP_TYPE_SIZE;
ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
}
ByteBufferUtils.skip(in, commonTimestampPrefix);
ByteBufferUtils.moveBufferToStream(out, in,
KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
// Write the type if it is not the same as before.
if ((flag & FLAG_SAME_TYPE) == 0) {
out.write(currentState.type);
}
// Write the value if it is not the same as before.
if ((flag & FLAG_SAME_VALUE) == 0) {
ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
}
// Skip key type and value in the input buffer.
ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
private int findCommonTimestampPrefix(byte[] curKvBuf, int curKvTsOff, byte[] preKvBuf,
int preKvTsOff) {
int commonPrefix = 0;
while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
&& curKvBuf[curKvTsOff + commonPrefix] == preKvBuf[preKvTsOff + commonPrefix]) {
commonPrefix++;
}
}
private int findCommonTimestampPrefix(FastDiffCompressionState left,
FastDiffCompressionState right) {
int prefixTimestamp = 0;
while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
left.timestamp[prefixTimestamp]
== right.timestamp[prefixTimestamp]) {
prefixTimestamp++;
}
return prefixTimestamp; // has to be at most 7 bytes
return commonPrefix; // has to be at most 7 bytes
}
private void uncompressSingleKeyValue(DataInputStream source,
@ -342,22 +237,98 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
}
@Override
public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in,
HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
in.rewind();
ByteBufferUtils.putInt(out, in.limit());
FastDiffCompressionState previousState = new FastDiffCompressionState();
FastDiffCompressionState currentState = new FastDiffCompressionState();
while (in.hasRemaining()) {
compressSingleKeyValue(previousState, currentState,
out, in);
afterEncodingKeyValue(in, out, encodingCtx);
public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
DataOutputStream out) throws IOException {
EncodingState state = encodingContext.getEncodingState();
int size = compressSingleKeyValue(out, kv, state.prevKv);
size += afterEncodingKeyValue(kv, out, encodingContext);
state.prevKv = kv;
return size;
}
// swap previousState <-> currentState
FastDiffCompressionState tmp = previousState;
previousState = currentState;
currentState = tmp;
private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
throws IOException {
byte flag = 0;
int kLength = kv.getKeyLength();
int vLength = kv.getValueLength();
byte[] curKvBuf = kv.getBuffer();
if (prevKv == null) {
// copy the key, there is no common prefix with none
out.write(flag);
ByteBufferUtils.putCompressedInt(out, kLength);
ByteBufferUtils.putCompressedInt(out, vLength);
ByteBufferUtils.putCompressedInt(out, 0);
out.write(curKvBuf, kv.getKeyOffset(), kLength + vLength);
} else {
byte[] preKvBuf = prevKv.getBuffer();
int preKeyLength = prevKv.getKeyLength();
int preValLength = prevKv.getValueLength();
// find a common prefix and skip it
int commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
- KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset(), preKeyLength
- KeyValue.TIMESTAMP_TYPE_SIZE);
if (kLength == prevKv.getKeyLength()) {
flag |= FLAG_SAME_KEY_LENGTH;
}
if (vLength == prevKv.getValueLength()) {
flag |= FLAG_SAME_VALUE_LENGTH;
}
if (kv.getTypeByte() == prevKv.getTypeByte()) {
flag |= FLAG_SAME_TYPE;
}
int commonTimestampPrefix = findCommonTimestampPrefix(curKvBuf, kv.getKeyOffset() + kLength
- KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset() + preKeyLength
- KeyValue.TIMESTAMP_TYPE_SIZE);
flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
// Check if current and previous values are the same. Compare value
// length first as an optimization.
if (vLength == preValLength
&& Bytes.equals(kv.getValueArray(), kv.getValueOffset(), vLength,
prevKv.getValueArray(), prevKv.getValueOffset(), preValLength)) {
flag |= FLAG_SAME_VALUE;
}
out.write(flag);
if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, kLength);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, vLength);
}
ByteBufferUtils.putCompressedInt(out, commonPrefix);
if (commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
// Previous and current rows are different. Copy the differing part of
// the row, skip the column family, and copy the qualifier.
out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kv.getRowLength()
+ KeyValue.ROW_LENGTH_SIZE - commonPrefix);
out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
} else {
// The common part includes the whole row. As the column family is the
// same across the whole file, it will automatically be included in the
// common prefix, so we need not special-case it here.
int restKeyLength = kLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restKeyLength);
}
out.write(curKvBuf, kv.getKeyOffset() + kLength - KeyValue.TIMESTAMP_TYPE_SIZE
+ commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
// Write the type if it is not the same as before.
if ((flag & FLAG_SAME_TYPE) == 0) {
out.write(kv.getTypeByte());
}
// Write the value if it is not the same as before.
if ((flag & FLAG_SAME_VALUE) == 0) {
out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
}
}
return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
}
@Override

View File

@ -49,13 +49,9 @@ import com.google.common.base.Preconditions;
public class HFileBlockDefaultEncodingContext implements
HFileBlockEncodingContext {
private byte[] onDiskBytesWithHeader;
private byte[] uncompressedBytesWithHeader;
private BlockType blockType;
private final DataBlockEncoding encodingAlgo;
private ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
private DataOutputStream dataOut = new DataOutputStream(encodedStream);
private byte[] dummyHeader;
// Compression state
@ -77,6 +73,8 @@ public class HFileBlockDefaultEncodingContext implements
/** Initialization vector */
private byte[] iv;
private EncodingState encoderState;
/**
* @param encoding encoding used
* @param headerBytes dummy header bytes
@ -113,52 +111,35 @@ public class HFileBlockDefaultEncodingContext implements
"Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
}
@Override
public void setDummyHeader(byte[] headerBytes) {
dummyHeader = headerBytes;
}
/**
* prepare to start a new encoding.
* @throws IOException
*/
public void prepareEncoding() throws IOException {
encodedStream.reset();
dataOut.write(dummyHeader);
if (encodingAlgo != null
&& encodingAlgo != DataBlockEncoding.NONE) {
encodingAlgo.writeIdInBytes(dataOut);
public void prepareEncoding(DataOutputStream out) throws IOException {
if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) {
encodingAlgo.writeIdInBytes(out);
}
}
@Override
public void postEncoding(BlockType blockType)
throws IOException {
dataOut.flush();
compressAfterEncodingWithBlockType(encodedStream.toByteArray(), blockType);
this.blockType = blockType;
}
/**
* @param uncompressedBytesWithHeader
* @param blockType
* @throws IOException
*/
public void compressAfterEncodingWithBlockType(byte[] uncompressedBytesWithHeader,
BlockType blockType) throws IOException {
compressAfterEncoding(uncompressedBytesWithHeader, blockType, dummyHeader);
@Override
public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException {
compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader);
return onDiskBytesWithHeader;
}
/**
* @param uncompressedBytesWithHeader
* @param blockType
* @param headerBytes
* @throws IOException
*/
protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
BlockType blockType, byte[] headerBytes) throws IOException {
this.uncompressedBytesWithHeader = uncompressedBytesWithHeader;
protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes)
throws IOException {
Encryption.Context cryptoContext = fileContext.getEncryptionContext();
if (cryptoContext != Encryption.Context.NONE) {
@ -238,20 +219,7 @@ public class HFileBlockDefaultEncodingContext implements
} else {
onDiskBytesWithHeader = uncompressedBytesWithHeader;
}
}
this.blockType = blockType;
}
@Override
public byte[] getOnDiskBytesWithHeader() {
return onDiskBytesWithHeader;
}
@Override
public byte[] getUncompressedBytesWithHeader() {
return uncompressedBytesWithHeader;
}
@Override
@ -271,10 +239,6 @@ public class HFileBlockDefaultEncodingContext implements
}
}
public DataOutputStream getOutputStreamForEncoder() {
return this.dataOut;
}
@Override
public DataBlockEncoding getDataBlockEncoding() {
return this.encodingAlgo;
@ -292,4 +256,14 @@ public class HFileBlockDefaultEncodingContext implements
public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
this.tagCompressionContext = tagCompressionContext;
}
@Override
public EncodingState getEncodingState() {
return this.encoderState;
}
@Override
public void setEncodingState(EncodingState state) {
this.encoderState = state;
}
}

View File

@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.encoding;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockType;
@ -33,33 +32,11 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
@InterfaceAudience.Private
public interface HFileBlockEncodingContext {
/**
* @return OutputStream to which encoded data is written
*/
OutputStream getOutputStreamForEncoder();
/**
* @return encoded and compressed bytes with header which are ready to write
* out to disk
*/
byte[] getOnDiskBytesWithHeader();
/**
* @return encoded but not heavily compressed bytes with header which can be
* cached in block cache
*/
byte[] getUncompressedBytesWithHeader();
/**
* @return the block type after encoding
*/
BlockType getBlockType();
/**
* sets the dummy header bytes
*/
void setDummyHeader(byte[] headerBytes);
/**
* @return the {@link DataBlockEncoding} encoding used
*/
@ -83,4 +60,22 @@ public interface HFileBlockEncodingContext {
* @return HFile context information
*/
HFileContext getHFileContext();
/**
* Sets the encoding state.
* @param state
*/
void setEncodingState(EncodingState state);
/**
* @return the encoding state
*/
EncodingState getEncodingState();
/**
* @param uncompressedBytesWithHeader encoded bytes with header
* @return Bytes with header which are ready to write out to disk. This is compressed and
* encrypted bytes applying the set compression algorithm and encryption.
*/
byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException;
}

View File

@ -44,50 +44,33 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Private
public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
private int addKV(int prevKeyOffset, DataOutputStream out,
ByteBuffer in, int prevKeyLength) throws IOException {
int keyLength = in.getInt();
int valueLength = in.getInt();
if (prevKeyOffset == -1) {
@Override
public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
DataOutputStream out) throws IOException {
byte[] kvBuf = kv.getBuffer();
int klength = kv.getKeyLength();
int vlength = kv.getValueLength();
EncodingState state = encodingContext.getEncodingState();
if (state.prevKv == null) {
// copy the key, there is no common prefix with none
ByteBufferUtils.putCompressedInt(out, keyLength);
ByteBufferUtils.putCompressedInt(out, valueLength);
ByteBufferUtils.putCompressedInt(out, klength);
ByteBufferUtils.putCompressedInt(out, vlength);
ByteBufferUtils.putCompressedInt(out, 0);
ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
out.write(kvBuf, kv.getKeyOffset(), klength + vlength);
} else {
// find a common prefix and skip it
int common = ByteBufferUtils.findCommonPrefix(
in, prevKeyOffset + KeyValue.ROW_OFFSET,
in.position(),
Math.min(prevKeyLength, keyLength));
ByteBufferUtils.putCompressedInt(out, keyLength - common);
ByteBufferUtils.putCompressedInt(out, valueLength);
int common = ByteBufferUtils.findCommonPrefix(state.prevKv.getBuffer(),
state.prevKv.getKeyOffset(), state.prevKv.getKeyLength(), kvBuf, kv.getKeyOffset(),
kv.getKeyLength());
ByteBufferUtils.putCompressedInt(out, klength - common);
ByteBufferUtils.putCompressedInt(out, vlength);
ByteBufferUtils.putCompressedInt(out, common);
ByteBufferUtils.skip(in, common);
ByteBufferUtils.moveBufferToStream(out, in, keyLength - common
+ valueLength);
}
return keyLength;
}
@Override
public void internalEncodeKeyValues(DataOutputStream writeHere, ByteBuffer in,
HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
in.rewind();
ByteBufferUtils.putInt(writeHere, in.limit());
int prevOffset = -1;
int offset = 0;
int keyLength = 0;
while (in.hasRemaining()) {
offset = in.position();
keyLength = addKV(prevOffset, writeHere, in, keyLength);
afterEncodingKeyValue(in, writeHere, encodingCtx);
prevOffset = offset;
out.write(kvBuf, kv.getKeyOffset() + common, klength - common + vlength);
}
int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
size += afterEncodingKeyValue(kv, out, encodingContext);
state.prevKv = kv;
return size;
}
@Override

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.codec.prefixtree.encode.PrefixTreeEncoder;
import org.apache.hadoop.hbase.codec.prefixtree.scanner.CellSearcher;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.EncodingState;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.io.WritableUtils;
/**
* This class is created via reflection in DataBlockEncoding enum. Update the enum if class name or
@ -63,50 +65,6 @@ public class PrefixTreeCodec implements DataBlockEncoder{
public PrefixTreeCodec() {
}
/**
* Copied from BufferedDataBlockEncoder. Almost definitely can be improved, but i'm not familiar
* enough with the concept of the HFileBlockEncodingContext.
*/
@Override
public void encodeKeyValues(ByteBuffer in,
HFileBlockEncodingContext blkEncodingCtx) throws IOException {
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
throw new IOException(this.getClass().getName() + " only accepts "
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
}
HFileBlockDefaultEncodingContext encodingCtx
= (HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding();
DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
internalEncodeKeyValues(dataOut, in, encodingCtx.getHFileContext().isIncludesMvcc(),
encodingCtx.getHFileContext().isIncludesTags());
//do i need to check this, or will it always be DataBlockEncoding.PREFIX_TREE?
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
} else {
encodingCtx.postEncoding(BlockType.DATA);
}
}
private void internalEncodeKeyValues(DataOutputStream encodedOutputStream,
ByteBuffer rawKeyValues, boolean includesMvccVersion, boolean includesTag) throws IOException {
rawKeyValues.rewind();
PrefixTreeEncoder builder = EncoderFactory.checkOut(encodedOutputStream, includesMvccVersion);
try {
KeyValue kv;
while ((kv = KeyValueUtil.nextShallowCopy(rawKeyValues, includesMvccVersion, includesTag)) != null) {
builder.write(kv);
}
builder.flush();
} finally {
EncoderFactory.checkIn(builder);
}
}
@Override
public ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext decodingCtx)
throws IOException {
@ -202,4 +160,54 @@ public class PrefixTreeCodec implements DataBlockEncoder{
return new PrefixTreeSeeker(decodingCtx.getHFileContext().isIncludesMvcc());
}
@Override
public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException {
PrefixTreeEncodingState state = (PrefixTreeEncodingState) encodingCtx.getEncodingState();
PrefixTreeEncoder builder = state.builder;
builder.write(kv);
int size = kv.getLength();
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
size += WritableUtils.getVIntSize(kv.getMvccVersion());
}
return size;
}
private static class PrefixTreeEncodingState extends EncodingState {
PrefixTreeEncoder builder = null;
}
@Override
public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
throws IOException {
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
throw new IOException(this.getClass().getName() + " only accepts "
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
}
HFileBlockDefaultEncodingContext encodingCtx =
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding(out);
PrefixTreeEncoder builder = EncoderFactory.checkOut(out, encodingCtx.getHFileContext()
.isIncludesMvcc());
PrefixTreeEncodingState state = new PrefixTreeEncodingState();
state.builder = builder;
blkEncodingCtx.setEncodingState(state);
}
@Override
public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader) throws IOException {
PrefixTreeEncodingState state = (PrefixTreeEncodingState) encodingCtx.getEncodingState();
PrefixTreeEncoder builder = state.builder;
builder.flush();
EncoderFactory.checkIn(builder);
// do i need to check this, or will it always be DataBlockEncoding.PREFIX_TREE?
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
} else {
encodingCtx.postEncoding(BlockType.DATA);
}
}
}

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class EncoderFactory {
private static final EncoderPool POOL = new ThreadLocalEncoderPool();
private static final EncoderPool POOL = new EncoderPoolImpl();
public static PrefixTreeEncoder checkOut(OutputStream outputStream, boolean includeMvccVersion) {

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.codec.prefixtree.encode;
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class EncoderPoolImpl implements EncoderPool {
private BlockingQueue<PrefixTreeEncoder> unusedEncoders =
new LinkedBlockingQueue<PrefixTreeEncoder>();
@Override
public PrefixTreeEncoder checkOut(OutputStream outputStream, boolean includeMvccVersion) {
PrefixTreeEncoder encoder = unusedEncoders.poll();
if (encoder == null) {
encoder = new PrefixTreeEncoder(outputStream, includeMvccVersion);
} else {
encoder.reset(outputStream, includeMvccVersion);
}
return encoder;
}
@Override
public void checkIn(PrefixTreeEncoder encoder) {
this.unusedEncoders.add(encoder);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
@ -641,6 +642,10 @@ public class HFileBlock implements Cacheable {
*/
private DataOutputStream userDataStream;
// Size of actual data being written. Not considering the block encoding/compression. This
// includes the header size also.
private int unencodedDataSizeWritten;
/**
* Bytes to be written to the file system, including the header. Compressed
* if compression is turned on. It also includes the checksum data that
@ -731,9 +736,24 @@ public class HFileBlock implements Cacheable {
// We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory);
if (newBlockType == BlockType.DATA) {
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
}
this.unencodedDataSizeWritten = 0;
return userDataStream;
}
/**
* Writes the kv to this block
* @param kv
* @throws IOException
*/
public void write(KeyValue kv) throws IOException{
expectState(State.WRITING);
this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx,
this.userDataStream);
}
/**
* Returns the stream for the user to write to. The block writer takes care
* of handling compression and buffering for caching on write. Can only be
@ -750,7 +770,7 @@ public class HFileBlock implements Cacheable {
* Transitions the block writer from the "writing" state to the "block
* ready" state. Does nothing if a block is already finished.
*/
private void ensureBlockReady() throws IOException {
void ensureBlockReady() throws IOException {
Preconditions.checkState(state != State.INIT,
"Unexpected state: " + state);
@ -768,6 +788,14 @@ public class HFileBlock implements Cacheable {
* write state to "block ready".
*/
private void finishBlock() throws IOException {
if (blockType == BlockType.DATA) {
BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
new BufferGrabbingByteArrayOutputStream();
baosInMemory.writeTo(baosInMemoryCopy);
this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
baosInMemoryCopy.buf, blockType);
blockType = dataBlockEncodingCtx.getBlockType();
}
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array.
uncompressedBytesWithHeader = baosInMemory.toByteArray();
@ -777,15 +805,13 @@ public class HFileBlock implements Cacheable {
// cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed.
state = State.BLOCK_READY;
if (blockType == BlockType.DATA) {
encodeDataBlockForDisk();
if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
onDiskBytesWithHeader = dataBlockEncodingCtx
.compressAndEncrypt(uncompressedBytesWithHeader);
} else {
defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
uncompressedBytesWithHeader, blockType);
onDiskBytesWithHeader =
defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
onDiskBytesWithHeader = defaultBlockEncodingCtx
.compressAndEncrypt(uncompressedBytesWithHeader);
}
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBytesWithHeader.length,
fileContext.getBytesPerChecksum());
@ -805,24 +831,17 @@ public class HFileBlock implements Cacheable {
onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
}
/**
* Encodes this block if it is a data block and encoding is turned on in
* {@link #dataBlockEncoder}.
*/
private void encodeDataBlockForDisk() throws IOException {
// do data block encoding, if data block encoder is set
ByteBuffer rawKeyValues =
ByteBuffer.wrap(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE,
uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE).slice();
public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
private byte[] buf;
// do the encoding
dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
@Override
public void write(byte[] b, int off, int len) {
this.buf = b;
}
uncompressedBytesWithHeader =
dataBlockEncodingCtx.getUncompressedBytesWithHeader();
onDiskBytesWithHeader =
dataBlockEncodingCtx.getOnDiskBytesWithHeader();
blockType = dataBlockEncodingCtx.getBlockType();
public byte[] getBuffer() {
return this.buf;
}
}
/**
@ -873,7 +892,7 @@ public class HFileBlock implements Cacheable {
* @param out the output stream to write the
* @throws IOException
*/
private void finishBlockAndWriteHeaderAndData(DataOutputStream out)
protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
throws IOException {
ensureBlockReady();
out.write(onDiskBytesWithHeader);
@ -972,9 +991,8 @@ public class HFileBlock implements Cacheable {
* @return the number of bytes written
*/
public int blockSizeWritten() {
if (state != State.WRITING)
return 0;
return userDataStream.size();
if (state != State.WRITING) return 0;
return this.unencodedDataSizeWritten;
}
/**

View File

@ -16,10 +16,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
@ -36,18 +37,38 @@ public interface HFileDataBlockEncoder {
byte[] DATA_BLOCK_ENCODING = Bytes.toBytes("DATA_BLOCK_ENCODING");
/**
* Should be called before an encoded or unencoded data block is written to
* disk.
* @param in KeyValues next to each other
* @param encodingResult the encoded result
* @param blockType block type
* Starts encoding for a block of KeyValues. Call
* {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[], BlockType)}
* to finish encoding of a block.
* @param encodingCtx
* @param out
* @throws IOException
*/
void beforeWriteToDisk(
ByteBuffer in,
HFileBlockEncodingContext encodingResult,
BlockType blockType
) throws IOException;
void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException;
/**
* Encodes a KeyValue.
* @param kv
* @param encodingCtx
* @param out
* @return unencoded kv size
* @throws IOException
*/
int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException;
/**
* Ends encoding for a block of KeyValues. Gives a chance for the encoder to do the finishing
* stuff for the encoded block. It must be called at the end of block encoding.
* @param encodingCtx
* @param out
* @param uncompressedBytesWithHeader
* @param blockType
* @throws IOException
*/
void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException;
/**
* Decides whether we should use a scanner over encoded blocks.

View File

@ -16,10 +16,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
@ -89,24 +90,11 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
}
return encoding;
}
/**
* Precondition: a non-encoded buffer. Postcondition: on-disk encoding.
*
* The encoded results can be stored in {@link HFileBlockEncodingContext}.
*
* @throws IOException
*/
@Override
public void beforeWriteToDisk(ByteBuffer in,
HFileBlockEncodingContext encodeCtx,
BlockType blockType) throws IOException {
if (encoding == DataBlockEncoding.NONE) {
// there is no need to encode the block before writing it to disk
((HFileBlockDefaultEncodingContext) encodeCtx).compressAfterEncodingWithBlockType(
in.array(), blockType);
return;
}
encodeBufferToHFileBlockBuffer(in, encoding, encodeCtx);
public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException {
return this.encoding.getEncoder().encode(kv, encodingCtx, out);
}
@Override
@ -114,26 +102,6 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
return encoding != DataBlockEncoding.NONE;
}
/**
* Encode a block of key value pairs.
*
* @param in input data to encode
* @param algo encoding algorithm
* @param encodeCtx where will the output data be stored
*/
private void encodeBufferToHFileBlockBuffer(ByteBuffer in, DataBlockEncoding algo,
HFileBlockEncodingContext encodeCtx) {
DataBlockEncoder encoder = algo.getEncoder();
try {
encoder.encodeKeyValues(in, encodeCtx);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Bug in data block encoder "
+ "'%s', it probably requested too much data, " +
"exception message: %s.",
algo.toString(), e.getMessage()), e);
}
}
@Override
public String toString() {
@ -158,4 +126,18 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
}
return new HFileBlockDefaultDecodingContext(fileContext);
}
@Override
public void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException {
if (this.encoding != null && this.encoding != DataBlockEncoding.NONE) {
this.encoding.getEncoder().startBlockEncoding(encodingCtx, out);
}
}
@Override
public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException {
this.encoding.getEncoder().endBlockEncoding(encodingCtx, out, uncompressedBytesWithHeader);
}
}

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* Writes HFile format version 2.
@ -251,8 +250,37 @@ public class HFileWriterV2 extends AbstractHFileWriter {
*/
@Override
public void append(final KeyValue kv) throws IOException {
append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
byte[] key = kv.getBuffer();
int koffset = kv.getKeyOffset();
int klength = kv.getKeyLength();
byte[] value = kv.getValueArray();
int voffset = kv.getValueOffset();
int vlength = kv.getValueLength();
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
if (!fsBlockWriter.isWriting())
newBlock();
fsBlockWriter.write(kv);
totalKeyLength += klength;
totalValueLength += vlength;
// Are we the first key in this block?
if (firstKeyInBlock == null) {
// Copy the key.
firstKeyInBlock = new byte[klength];
System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
}
lastKeyBuffer = key;
lastKeyOffset = koffset;
lastKeyLength = klength;
entryCount++;
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
}
@ -268,59 +296,14 @@ public class HFileWriterV2 extends AbstractHFileWriter {
*/
@Override
public void append(final byte[] key, final byte[] value) throws IOException {
append(0, key, 0, key.length, value, 0, value.length);
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
*
* @param key
* @param koffset
* @param klength
* @param value
* @param voffset
* @param vlength
* @throws IOException
*/
protected void append(final long memstoreTS, final byte[] key, final int koffset,
final int klength, final byte[] value, final int voffset, final int vlength)
throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
if (!fsBlockWriter.isWriting())
newBlock();
// Write length of key and value and then actual key and value bytes.
// Additionally, we may also write down the memstoreTS.
{
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
totalKeyLength += klength;
out.writeInt(vlength);
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
if (this.hFileContext.isIncludesMvcc()) {
WritableUtils.writeVLong(out, memstoreTS);
}
}
// Are we the first key in this block?
if (firstKeyInBlock == null) {
// Copy the key.
firstKeyInBlock = new byte[klength];
System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
}
lastKeyBuffer = key;
lastKeyOffset = koffset;
lastKeyLength = klength;
entryCount++;
int kvlen = (int) KeyValue.getKeyValueDataStructureSize(key.length, value.length, 0);
byte[] b = new byte[kvlen];
int pos = 0;
pos = Bytes.putInt(b, pos, key.length);
pos = Bytes.putInt(b, pos, value.length);
pos = Bytes.putBytes(b, pos, key, 0, key.length);
Bytes.putBytes(b, pos, value, 0, value.length);
append(new KeyValue(b, 0, kvlen));
}
@Override

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
/**
* {@link HFile} writer for version 3.
@ -86,10 +84,11 @@ public class HFileWriterV3 extends HFileWriterV2 {
@Override
public void append(final KeyValue kv) throws IOException {
// Currently get the complete arrays
append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(), kv.getTagsArray(),
kv.getTagsOffset(), kv.getTagsLength());
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
super.append(kv);
short tagsLength = kv.getTagsLength();
if (tagsLength > this.maxTagsLength) {
this.maxTagsLength = tagsLength;
}
}
/**
@ -119,73 +118,20 @@ public class HFileWriterV3 extends HFileWriterV2 {
*/
@Override
public void append(final byte[] key, final byte[] value, byte[] tag) throws IOException {
append(0, key, 0, key.length, value, 0, value.length, tag, 0, tag.length);
int kvlen = (int) KeyValue.getKeyValueDataStructureSize(key.length, value.length, tag.length);
byte[] b = new byte[kvlen];
int pos = 0;
pos = Bytes.putInt(b, pos, key.length);
pos = Bytes.putInt(b, pos, value.length);
pos = Bytes.putBytes(b, pos, key, 0, key.length);
pos = Bytes.putBytes(b, pos, value, 0, value.length);
if (tag.length > 0) {
pos = Bytes.putShort(b, pos, (short) tag.length);
Bytes.putBytes(b, pos, tag, 0, tag.length);
}
append(new KeyValue(b, 0, kvlen));
}
/**
* Add key/value to file. Keys must be added in an order that agrees with the
* Comparator passed on construction.
* @param key
* @param koffset
* @param klength
* @param value
* @param voffset
* @param vlength
* @param tag
* @param tagsOffset
* @param tagLength
* @throws IOException
*/
private void append(final long memstoreTS, final byte[] key, final int koffset,
final int klength, final byte[] value, final int voffset, final int vlength,
final byte[] tag, final int tagsOffset, final int tagsLength) throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
if (!fsBlockWriter.isWriting())
newBlock();
// Write length of key and value and then actual key and value bytes.
// Additionally, we may also write down the memstoreTS.
{
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
totalKeyLength += klength;
out.writeInt(vlength);
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
// Write the additional tag into the stream
if (hFileContext.isIncludesTags()) {
out.writeShort((short) tagsLength);
if (tagsLength > 0) {
out.write(tag, tagsOffset, tagsLength);
if (tagsLength > maxTagsLength) {
maxTagsLength = tagsLength;
}
}
}
if (this.hFileContext.isIncludesMvcc()) {
WritableUtils.writeVLong(out, memstoreTS);
}
}
// Are we the first key in this block?
if (firstKeyInBlock == null) {
// Copy the key.
firstKeyInBlock = new byte[klength];
System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
}
lastKeyBuffer = key;
lastKeyOffset = koffset;
lastKeyLength = klength;
entryCount++;
}
protected void finishFileInfo() throws IOException {
super.finishFileInfo();
if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {

View File

@ -16,15 +16,17 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.io.WritableUtils;
/**
* Does not perform any kind of encoding/decoding.
@ -40,18 +42,30 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
}
@Override
public void beforeWriteToDisk(ByteBuffer in,
HFileBlockEncodingContext encodeCtx, BlockType blockType)
public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException {
if (!(encodeCtx.getClass().getName().equals(
HFileBlockDefaultEncodingContext.class.getName()))) {
throw new IOException (this.getClass().getName() + " only accepts " +
HFileBlockDefaultEncodingContext.class.getName() + ".");
}
int klength = kv.getKeyLength();
int vlength = kv.getValueLength();
HFileBlockDefaultEncodingContext defaultContext =
(HFileBlockDefaultEncodingContext) encodeCtx;
defaultContext.compressAfterEncodingWithBlockType(in.array(), blockType);
out.writeInt(klength);
out.writeInt(vlength);
out.write(kv.getBuffer(), kv.getKeyOffset(), klength);
out.write(kv.getValueArray(), kv.getValueOffset(), vlength);
int encodedKvSize = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
// Write the additional tag into the stream
if (encodingCtx.getHFileContext().isIncludesTags()) {
short tagsLength = kv.getTagsLength();
out.writeShort(tagsLength);
if (tagsLength > 0) {
out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
}
encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
}
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
WritableUtils.writeVLong(out, kv.getMvccVersion());
encodedKvSize += WritableUtils.getVIntSize(kv.getMvccVersion());
}
return encodedKvSize;
}
@Override
@ -88,4 +102,15 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
return new HFileBlockDefaultDecodingContext(meta);
}
@Override
public void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException {
}
@Override
public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException {
encodingCtx.postEncoding(BlockType.DATA);
}
}

View File

@ -20,7 +20,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.Bytes;
@ -91,48 +94,6 @@ public class TestDataBlockEncoders {
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
}
}
private byte[] encodeBytes(DataBlockEncoding encoding, ByteBuffer dataset)
throws IOException {
DataBlockEncoder encoder = encoding.getEncoder();
HFileBlockEncodingContext encodingCtx = getEncodingContext(Compression.Algorithm.NONE,
encoding);
encoder.encodeKeyValues(dataset, encodingCtx);
byte[] encodedBytesWithHeader = encodingCtx.getUncompressedBytesWithHeader();
byte[] encodedData = new byte[encodedBytesWithHeader.length - ENCODED_DATA_OFFSET];
System.arraycopy(encodedBytesWithHeader, ENCODED_DATA_OFFSET, encodedData, 0,
encodedData.length);
return encodedData;
}
private void testAlgorithm(ByteBuffer dataset, DataBlockEncoding encoding,
List<KeyValue> kvList) throws IOException {
// encode
byte[] encodedBytes = encodeBytes(encoding, dataset);
// decode
ByteArrayInputStream bais = new ByteArrayInputStream(encodedBytes);
DataInputStream dis = new DataInputStream(bais);
ByteBuffer actualDataset;
DataBlockEncoder encoder = encoding.getEncoder();
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTags)
.withCompression(Compression.Algorithm.NONE).build();
actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(meta));
dataset.rewind();
actualDataset.rewind();
// this is because in case of prefix tree the decoded stream will not have
// the
// mvcc in it.
// if (encoding != DataBlockEncoding.PREFIX_TREE) {
assertEquals("Encoding -> decoding gives different results for " + encoder,
Bytes.toStringBinary(dataset), Bytes.toStringBinary(actualDataset));
// }
}
/**
* Test data block encoding of empty KeyValue.
@ -158,8 +119,7 @@ public class TestDataBlockEncoders {
kvList.add(new KeyValue(row, family, qualifier, 0l, value, new Tag[] { new Tag((byte) 1,
metaValue2) }));
}
testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS),
kvList);
testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
}
/**
@ -186,8 +146,7 @@ public class TestDataBlockEncoders {
kvList.add(new KeyValue(row, family, qualifier, -1l, Type.Put, value));
kvList.add(new KeyValue(row, family, qualifier, -2l, Type.Put, value));
}
testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS),
kvList);
testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
}
@ -199,8 +158,7 @@ public class TestDataBlockEncoders {
@Test
public void testExecutionOnSample() throws IOException {
List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS),
kvList);
testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
}
/**
@ -209,18 +167,17 @@ public class TestDataBlockEncoders {
@Test
public void testSeekingOnSample() throws IOException {
List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
includesMemstoreTS);
// create all seekers
List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<DataBlockEncoder.EncodedSeeker>();
List<DataBlockEncoder.EncodedSeeker> encodedSeekers =
new ArrayList<DataBlockEncoder.EncodedSeeker>();
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
if (encoding.getEncoder() == null) {
DataBlockEncoder encoder = encoding.getEncoder();
if (encoder == null) {
continue;
}
ByteBuffer encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
DataBlockEncoder encoder = encoding.getEncoder();
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
getEncodingContext(Compression.Algorithm.NONE, encoding));
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS)
@ -258,25 +215,35 @@ public class TestDataBlockEncoders {
}
}
static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs,
HFileBlockEncodingContext encodingContext) throws IOException {
DataBlockEncoder encoder = encoding.getEncoder();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
DataOutputStream dos = new DataOutputStream(baos);
encoder.startBlockEncoding(encodingContext, dos);
for (KeyValue kv : kvs) {
encoder.encode(kv, encodingContext, dos);
}
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
baos.writeTo(stream);
encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
return ByteBuffer.wrap(encodedData);
}
@Test
public void testNextOnSample() {
public void testNextOnSample() throws IOException {
List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
includesMemstoreTS);
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
if (encoding.getEncoder() == null) {
continue;
}
DataBlockEncoder encoder = encoding.getEncoder();
ByteBuffer encodedBuffer = null;
try {
encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
} catch (IOException e) {
throw new RuntimeException(String.format("Bug while encoding using '%s'",
encoder.toString()), e);
}
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
getEncodingContext(Compression.Algorithm.NONE, encoding));
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS)
@ -318,25 +285,19 @@ public class TestDataBlockEncoders {
/**
* Test whether the decompression of first key is implemented correctly.
* @throws IOException
*/
@Test
public void testFirstKeyInBlockOnSample() {
public void testFirstKeyInBlockOnSample() throws IOException {
List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
includesMemstoreTS);
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
if (encoding.getEncoder() == null) {
continue;
}
DataBlockEncoder encoder = encoding.getEncoder();
ByteBuffer encodedBuffer = null;
try {
encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
} catch (IOException e) {
throw new RuntimeException(String.format("Bug while encoding using '%s'",
encoder.toString()), e);
}
ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
getEncodingContext(Compression.Algorithm.NONE, encoding));
ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer);
KeyValue firstKv = sampleKv.get(0);
if (0 != Bytes.compareTo(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.limit(),
@ -360,9 +321,7 @@ public class TestDataBlockEncoders {
ByteBuffer expectedKey = null;
ByteBuffer expectedValue = null;
for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
seeker.seekToKeyInBlock(
new KeyValue.KeyOnlyKeyValue(keyValue.getBuffer(), keyValue.getKeyOffset(), keyValue
.getKeyLength()), seekBefore);
seeker.seekToKeyInBlock(keyValue, seekBefore);
seeker.rewind();
ByteBuffer actualKeyValue = seeker.getKeyValueBuffer();
@ -388,24 +347,34 @@ public class TestDataBlockEncoders {
}
}
}
private void testEncodersOnDataset(ByteBuffer onDataset, List<KeyValue> kvList) throws IOException {
ByteBuffer dataset = ByteBuffer.allocate(onDataset.capacity());
onDataset.rewind();
dataset.put(onDataset);
onDataset.rewind();
dataset.flip();
private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS,
boolean includesTags) throws IOException {
ByteBuffer unencodedDataBuf = RedundantKVGenerator.convertKvToByteBuffer(kvList,
includesMemstoreTS);
HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTags).build();
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
if (encoding.getEncoder() == null) {
DataBlockEncoder encoder = encoding.getEncoder();
if (encoder == null) {
continue;
}
HFileBlockEncodingContext encodingContext = new HFileBlockDefaultEncodingContext(encoding,
HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
testAlgorithm(dataset, encoding, kvList);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
DataOutputStream dos = new DataOutputStream(baos);
encoder.startBlockEncoding(encodingContext, dos);
for (KeyValue kv : kvList) {
encoder.encode(kv, encodingContext, dos);
}
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
baos.writeTo(stream);
encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
byte[] encodedData = baos.toByteArray();
// ensure that dataset is unchanged
dataset.rewind();
assertEquals("Input of two methods is changed", onDataset, dataset);
testAlgorithm(encodedData, unencodedDataBuf, encoder);
}
}
@ -427,8 +396,26 @@ public class TestDataBlockEncoders {
kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0));
kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1));
}
testEncodersOnDataset(RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS),
kvList);
testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
}
private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf,
DataBlockEncoder encoder) throws IOException {
// decode
ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET,
encodedData.length - ENCODED_DATA_OFFSET);
DataInputStream dis = new DataInputStream(bais);
ByteBuffer actualDataset;
HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTags)
.withCompression(Compression.Algorithm.NONE).build();
actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(meta));
actualDataset.rewind();
// this is because in case of prefix tree the decoded stream will not have
// the
// mvcc in it.
assertEquals("Encoding -> decoding gives different results for " + encoder,
Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset));
}
}

View File

@ -98,7 +98,6 @@ public class TestPrefixTreeEncoding {
formatRowNum = true;
PrefixTreeCodec encoder = new PrefixTreeCodec();
int batchId = numBatchesWritten++;
ByteBuffer dataBuffer = generateFixedTestData(kvset, batchId, false, includesTag);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(false)
@ -106,10 +105,13 @@ public class TestPrefixTreeEncoding {
.withCompression(Algorithm.NONE).build();
HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
generateFixedTestData(kvset, batchId, false, includesTag, encoder, blkEncodingCtx,
userDataStream);
EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
encoder.newDataBlockDecodingContext(meta));
byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
byte[] onDiskBytes = baosInMemory.toByteArray();
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
onDiskBytes.length - DataBlockEncoding.ID_SIZE);
seeker.setCurrentBuffer(readBuffer);
@ -142,7 +144,8 @@ public class TestPrefixTreeEncoding {
@Test
public void testScanWithRandomData() throws Exception {
PrefixTreeCodec encoder = new PrefixTreeCodec();
ByteBuffer dataBuffer = generateRandomTestData(kvset, numBatchesWritten++, includesTag);
ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(false)
@ -151,10 +154,11 @@ public class TestPrefixTreeEncoding {
.build();
HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
generateRandomTestData(kvset, numBatchesWritten++, includesTag, encoder, blkEncodingCtx,
userDataStream);
EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
encoder.newDataBlockDecodingContext(meta));
byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
byte[] onDiskBytes = baosInMemory.toByteArray();
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
onDiskBytes.length - DataBlockEncoding.ID_SIZE);
seeker.setCurrentBuffer(readBuffer);
@ -178,8 +182,9 @@ public class TestPrefixTreeEncoding {
@Test
public void testSeekWithRandomData() throws Exception {
PrefixTreeCodec encoder = new PrefixTreeCodec();
ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
int batchId = numBatchesWritten++;
ByteBuffer dataBuffer = generateRandomTestData(kvset, batchId, includesTag);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(false)
@ -188,10 +193,10 @@ public class TestPrefixTreeEncoding {
.build();
HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
generateRandomTestData(kvset, batchId, includesTag, encoder, blkEncodingCtx, userDataStream);
EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
encoder.newDataBlockDecodingContext(meta));
byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
byte[] onDiskBytes = baosInMemory.toByteArray();
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
onDiskBytes.length - DataBlockEncoding.ID_SIZE);
verifySeeking(seeker, readBuffer, batchId);
@ -201,7 +206,6 @@ public class TestPrefixTreeEncoding {
public void testSeekWithFixedData() throws Exception {
PrefixTreeCodec encoder = new PrefixTreeCodec();
int batchId = numBatchesWritten++;
ByteBuffer dataBuffer = generateFixedTestData(kvset, batchId, includesTag);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(false)
@ -210,10 +214,12 @@ public class TestPrefixTreeEncoding {
.build();
HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
encoder.encodeKeyValues(dataBuffer, blkEncodingCtx);
ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
generateFixedTestData(kvset, batchId, includesTag, encoder, blkEncodingCtx, userDataStream);
EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
encoder.newDataBlockDecodingContext(meta));
byte[] onDiskBytes = blkEncodingCtx.getOnDiskBytesWithHeader();
byte[] onDiskBytes = baosInMemory.toByteArray();
ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
onDiskBytes.length - DataBlockEncoding.ID_SIZE);
verifySeeking(seeker, readBuffer, batchId);
@ -255,15 +261,15 @@ public class TestPrefixTreeEncoding {
}
}
private static ByteBuffer generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset,
int batchId, boolean useTags) throws Exception {
return generateFixedTestData(kvset, batchId, true, useTags);
private static void generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset, int batchId,
boolean useTags, PrefixTreeCodec encoder, HFileBlockEncodingContext blkEncodingCtx,
DataOutputStream userDataStream) throws Exception {
generateFixedTestData(kvset, batchId, true, useTags, encoder, blkEncodingCtx, userDataStream);
}
private static ByteBuffer generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset,
int batchId, boolean partial, boolean useTags) throws Exception {
ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
private static void generateFixedTestData(ConcurrentSkipListSet<KeyValue> kvset,
int batchId, boolean partial, boolean useTags, PrefixTreeCodec encoder,
HFileBlockEncodingContext blkEncodingCtx, DataOutputStream userDataStream) throws Exception {
for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
if (partial && i / 10 % 2 == 1)
continue;
@ -279,24 +285,16 @@ public class TestPrefixTreeEncoding {
}
}
}
encoder.startBlockEncoding(blkEncodingCtx, userDataStream);
for (KeyValue kv : kvset) {
userDataStream.writeInt(kv.getKeyLength());
userDataStream.writeInt(kv.getValueLength());
userDataStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
userDataStream.write(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
if (useTags) {
userDataStream.writeShort(kv.getTagsLength());
userDataStream.write(kv.getBuffer(), kv.getValueOffset() + kv.getValueLength()
+ Bytes.SIZEOF_SHORT, kv.getTagsLength());
}
encoder.encode(kv, blkEncodingCtx, userDataStream);
}
return ByteBuffer.wrap(baosInMemory.toByteArray());
encoder.endBlockEncoding(blkEncodingCtx, userDataStream, null);
}
private static ByteBuffer generateRandomTestData(ConcurrentSkipListSet<KeyValue> kvset,
int batchId, boolean useTags) throws Exception {
ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
private static void generateRandomTestData(ConcurrentSkipListSet<KeyValue> kvset,
int batchId, boolean useTags, PrefixTreeCodec encoder,
HFileBlockEncodingContext blkEncodingCtx, DataOutputStream userDataStream) throws Exception {
Random random = new Random();
for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
if (random.nextInt(100) < 50)
@ -315,19 +313,11 @@ public class TestPrefixTreeEncoding {
}
}
}
encoder.startBlockEncoding(blkEncodingCtx, userDataStream);
for (KeyValue kv : kvset) {
userDataStream.writeInt(kv.getKeyLength());
userDataStream.writeInt(kv.getValueLength());
userDataStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
userDataStream.write(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
if (useTags) {
userDataStream.writeShort(kv.getTagsLength());
userDataStream.write(kv.getBuffer(), kv.getValueOffset() + kv.getValueLength()
+ Bytes.SIZEOF_SHORT, kv.getTagsLength());
}
encoder.encode(kv, blkEncodingCtx, userDataStream);
}
return ByteBuffer.wrap(baosInMemory.toByteArray());
encoder.endBlockEncoding(blkEncodingCtx, userDataStream, null);
}
private static byte[] getRowKey(int batchId, int i) {

View File

@ -32,30 +32,12 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.test.RedundantKVGenerator;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestSeekToBlockWithEncoders {
private static int ENCODED_DATA_OFFSET = HConstants.HFILEBLOCK_HEADER_SIZE
+ DataBlockEncoding.ID_SIZE;
private HFileBlockEncodingContext getEncodingContext(Compression.Algorithm algo,
DataBlockEncoding encoding) {
DataBlockEncoder encoder = encoding.getEncoder();
HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(false)
.withIncludesTags(false).withCompression(algo).build();
if (encoder != null) {
return encoder
.newDataBlockEncodingContext(encoding, HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
} else {
return new HFileBlockDefaultEncodingContext(encoding, HConstants.HFILEBLOCK_DUMMY_HEADER,
meta);
}
}
/**
* Test seeking while file is encoded.
*/
@ -77,10 +59,9 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv5 = new KeyValue(Bytes.toBytes("bba"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
Bytes.toBytes("val"));
sampleKv.add(kv5);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = new KeyValue(Bytes.toBytes("aae"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
Bytes.toBytes("val"));
seekToTheKey(kv4, originalBuffer, toSeek);
seekToTheKey(kv4, sampleKv, toSeek);
}
/**
@ -104,10 +85,9 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv5 = new KeyValue(Bytes.toBytes("aaaad"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
Bytes.toBytes("val"));
sampleKv.add(kv5);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = new KeyValue(Bytes.toBytes("aaaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
Bytes.toBytes("val"));
seekToTheKey(kv1, originalBuffer, toSeek);
seekToTheKey(kv1, sampleKv, toSeek);
}
/**
@ -131,10 +111,9 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv5 = new KeyValue(Bytes.toBytes("bbbcd"), Bytes.toBytes("f1"), Bytes.toBytes("q1"),
Bytes.toBytes("val"));
sampleKv.add(kv5);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = new KeyValue(Bytes.toBytes("bbbce"), Bytes.toBytes("f1"),
Bytes.toBytes("q1"), Bytes.toBytes("val"));
seekToTheKey(kv5, originalBuffer, toSeek);
seekToTheKey(kv5, sampleKv, toSeek);
}
/**
@ -155,10 +134,9 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv4 = new KeyValue(Bytes.toBytes("row11baa"), Bytes.toBytes("f1"),
Bytes.toBytes("q1"), Bytes.toBytes("val"));
sampleKv.add(kv4);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = KeyValueUtil.createLastOnRow(kv3.getRowArray(), kv3.getRowOffset(),
kv3.getRowLength(), null, 0, 0, null, 0, 0);
seekToTheKey(kv3, originalBuffer, toSeek);
seekToTheKey(kv3, sampleKv, toSeek);
}
@Test
@ -176,10 +154,9 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv5 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q2"),
Bytes.toBytes("val"));
sampleKv.add(kv5);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), Bytes.toBytes("q2"),
Bytes.toBytes("val"));
seekToTheKey(kv5, originalBuffer, toSeek);
seekToTheKey(kv5, sampleKv, toSeek);
}
@Test
@ -200,10 +177,9 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv6 = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q5"),
Bytes.toBytes("val"));
sampleKv.add(kv6);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q5"),
Bytes.toBytes("val"));
seekToTheKey(kv6, originalBuffer, toSeek);
seekToTheKey(kv6, sampleKv, toSeek);
}
@Test
@ -224,10 +200,9 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv6 = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("z5"),
Bytes.toBytes("val"));
sampleKv.add(kv6);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q5"),
Bytes.toBytes("val"));
seekToTheKey(kv5, originalBuffer, toSeek);
seekToTheKey(kv5, sampleKv, toSeek);
}
@Test
@ -248,10 +223,9 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv6 = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("qz"),
Bytes.toBytes("val"));
sampleKv.add(kv6);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("qz"),
Bytes.toBytes("val"));
seekToTheKey(kv6, originalBuffer, toSeek);
seekToTheKey(kv6, sampleKv, toSeek);
}
@Test
@ -269,27 +243,28 @@ public class TestSeekToBlockWithEncoders {
KeyValue kv5 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("fam1"), Bytes.toBytes("q2"),
Bytes.toBytes("val"));
sampleKv.add(kv5);
ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, false);
KeyValue toSeek = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("fam2"),
Bytes.toBytes("q2"), Bytes.toBytes("val"));
seekToTheKey(kv5, originalBuffer, toSeek);
seekToTheKey(kv5, sampleKv, toSeek);
}
private void seekToTheKey(KeyValue expected, ByteBuffer originalBuffer, KeyValue toSeek)
private void seekToTheKey(KeyValue expected, List<KeyValue> kvs, KeyValue toSeek)
throws IOException {
// create all seekers
List<DataBlockEncoder.EncodedSeeker> encodedSeekers =
new ArrayList<DataBlockEncoder.EncodedSeeker>();
List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<DataBlockEncoder.EncodedSeeker>();
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
if (encoding.getEncoder() == null || encoding == DataBlockEncoding.PREFIX_TREE) {
continue;
}
ByteBuffer encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
DataBlockEncoder encoder = encoding.getEncoder();
HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false)
.withIncludesMvcc(false).withIncludesTags(false)
.withCompression(Compression.Algorithm.NONE).build();
HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(encoding,
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
ByteBuffer encodedBuffer = TestDataBlockEncoders.encodeKeyValues(encoding, kvs,
encodingContext);
DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
encoder.newDataBlockDecodingContext(meta));
seeker.setCurrentBuffer(encodedBuffer);
@ -311,17 +286,4 @@ public class TestSeekToBlockWithEncoders {
seeker.rewind();
}
}
private byte[] encodeBytes(DataBlockEncoding encoding, ByteBuffer dataset) throws IOException {
DataBlockEncoder encoder = encoding.getEncoder();
HFileBlockEncodingContext encodingCtx = getEncodingContext(Compression.Algorithm.NONE, encoding);
encoder.encodeKeyValues(dataset, encodingCtx);
byte[] encodedBytesWithHeader = encodingCtx.getUncompressedBytesWithHeader();
byte[] encodedData = new byte[encodedBytesWithHeader.length - ENCODED_DATA_OFFSET];
System.arraycopy(encodedBytesWithHeader, ENCODED_DATA_OFFSET, encodedData, 0,
encodedData.length);
return encodedData;
}
}

View File

@ -55,13 +55,9 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.DoubleOutputStream;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@ -83,8 +79,7 @@ public class TestHFileBlock {
private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
NONE, GZ };
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
private static final int NUM_TEST_BLOCKS = 1000;
private static final int NUM_READER_THREADS = 26;
@ -94,10 +89,8 @@ public class TestHFileBlock {
private static int FIELD_LENGTH = 10;
private static float CHANCE_TO_REPEAT = 0.6f;
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private FileSystem fs;
private int uncompressedSizeV1;
private final boolean includesMemstoreTS;
private final boolean includesTag;
@ -122,8 +115,8 @@ public class TestHFileBlock {
dos.writeInt(i / 100);
}
static int writeTestKeyValues(OutputStream dos, int seed, boolean includesMemstoreTS, boolean useTag)
throws IOException {
static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
boolean useTag) throws IOException {
List<KeyValue> keyValues = new ArrayList<KeyValue>();
Random randomizer = new Random(42l + seed); // just any fixed number
@ -177,26 +170,16 @@ public class TestHFileBlock {
// sort it and write to stream
int totalSize = 0;
Collections.sort(keyValues, KeyValue.COMPARATOR);
DataOutputStream dataOutputStream = new DataOutputStream(dos);
Collections.sort(keyValues, KeyValue.COMPARATOR);
for (KeyValue kv : keyValues) {
dataOutputStream.writeInt(kv.getKeyLength());
dataOutputStream.writeInt(kv.getValueLength());
dataOutputStream.write(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
dataOutputStream.write(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
// Write the additonal tag into the stream
// always write the taglength
totalSize += kv.getLength();
if (useTag) {
dataOutputStream.writeShort(kv.getTagsLength());
dataOutputStream.write(kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
}
if (includesMemstoreTS) {
long memstoreTS = randomizer.nextLong();
WritableUtils.writeVLong(dataOutputStream, memstoreTS);
kv.setMvccVersion(memstoreTS);
totalSize += WritableUtils.getVIntSize(memstoreTS);
}
hbw.write(kv);
}
return totalSize;
}
@ -209,7 +192,6 @@ public class TestHFileBlock {
DataOutputStream dos = new DataOutputStream(os);
BlockType.META.write(dos); // Let's make this a meta block.
writeTestBlockContents(dos);
uncompressedSizeV1 = dos.size();
dos.flush();
algo.returnCompressor(compressor);
return baos.toByteArray();
@ -229,7 +211,7 @@ public class TestHFileBlock {
DataOutputStream dos = hbw.startWriting(blockType);
writeTestBlockContents(dos);
dos.flush();
byte[] headerAndData = hbw.getHeaderAndDataForTest();
hbw.ensureBlockReady();
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
hbw.release();
return hbw;
@ -383,8 +365,8 @@ public class TestHFileBlock {
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo + "_" + encoding.toString());
FSDataOutputStream os = fs.create(path);
HFileDataBlockEncoder dataBlockEncoder =
new HFileDataBlockEncoderImpl(encoding);
HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
HFileContext meta = new HFileContextBuilder()
.withCompression(algo)
.withIncludesMvcc(includesMemstoreTS)
@ -392,16 +374,30 @@ public class TestHFileBlock {
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
.build();
HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder,
meta);
HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
long totalSize = 0;
final List<Integer> encodedSizes = new ArrayList<Integer>();
final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
for (int blockId = 0; blockId < numBlocks; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
writeEncodedBlock(algo, encoding, dos, encodedSizes, encodedBlocks,
blockId, includesMemstoreTS, HConstants.HFILEBLOCK_DUMMY_HEADER, includesTag);
hbw.startWriting(BlockType.DATA);
writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
hbw.writeHeaderAndData(os);
int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
byte[] encodedResultWithHeader = hbw.getUncompressedBufferWithHeader().array();
final int encodedSize = encodedResultWithHeader.length - headerLen;
if (encoding != DataBlockEncoding.NONE) {
// We need to account for the two-byte encoding algorithm ID that
// comes after the 24-byte block header but before encoded KVs.
headerLen += DataBlockEncoding.ID_SIZE;
}
byte[] encodedDataSection =
new byte[encodedResultWithHeader.length - headerLen];
System.arraycopy(encodedResultWithHeader, headerLen,
encodedDataSection, 0, encodedDataSection.length);
final ByteBuffer encodedBuf =
ByteBuffer.wrap(encodedDataSection);
encodedSizes.add(encodedSize);
encodedBlocks.add(encodedBuf);
totalSize += hbw.getOnDiskSizeWithHeader();
}
os.close();
@ -438,8 +434,7 @@ public class TestHFileBlock {
expectedBuffer.rewind();
// test if content matches, produce nice message
assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding,
pread);
assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
}
is.close();
}
@ -447,60 +442,6 @@ public class TestHFileBlock {
}
}
static void writeEncodedBlock(Algorithm algo, DataBlockEncoding encoding,
DataOutputStream dos, final List<Integer> encodedSizes,
final List<ByteBuffer> encodedBlocks, int blockId,
boolean includesMemstoreTS, byte[] dummyHeader, boolean useTag) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DoubleOutputStream doubleOutputStream =
new DoubleOutputStream(dos, baos);
writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS, useTag);
ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
rawBuf.rewind();
DataBlockEncoder encoder = encoding.getEncoder();
int headerLen = dummyHeader.length;
byte[] encodedResultWithHeader = null;
HFileContext meta = new HFileContextBuilder()
.withCompression(algo)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(useTag)
.build();
if (encoder != null) {
HFileBlockEncodingContext encodingCtx = encoder.newDataBlockEncodingContext(encoding,
dummyHeader, meta);
encoder.encodeKeyValues(rawBuf, encodingCtx);
encodedResultWithHeader =
encodingCtx.getUncompressedBytesWithHeader();
} else {
HFileBlockDefaultEncodingContext defaultEncodingCtx = new HFileBlockDefaultEncodingContext(
encoding, dummyHeader, meta);
byte[] rawBufWithHeader =
new byte[rawBuf.array().length + headerLen];
System.arraycopy(rawBuf.array(), 0, rawBufWithHeader,
headerLen, rawBuf.array().length);
defaultEncodingCtx.compressAfterEncodingWithBlockType(rawBufWithHeader,
BlockType.DATA);
encodedResultWithHeader =
defaultEncodingCtx.getUncompressedBytesWithHeader();
}
final int encodedSize =
encodedResultWithHeader.length - headerLen;
if (encoder != null) {
// We need to account for the two-byte encoding algorithm ID that
// comes after the 24-byte block header but before encoded KVs.
headerLen += DataBlockEncoding.ID_SIZE;
}
byte[] encodedDataSection =
new byte[encodedResultWithHeader.length - headerLen];
System.arraycopy(encodedResultWithHeader, headerLen,
encodedDataSection, 0, encodedDataSection.length);
final ByteBuffer encodedBuf =
ByteBuffer.wrap(encodedDataSection);
encodedSizes.add(encodedSize);
encodedBlocks.add(encodedBuf);
}
static void assertBuffersEqual(ByteBuffer expectedBuffer,
ByteBuffer actualBuffer, Compression.Algorithm compression,
DataBlockEncoding encoding, boolean pread) {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.Compressor;
import org.junit.Before;
import org.junit.Test;
@ -67,21 +69,13 @@ import com.google.common.base.Preconditions;
@Category(SmallTests.class)
@RunWith(Parameterized.class)
public class TestHFileBlockCompatibility {
// change this value to activate more logs
private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
NONE, GZ };
// The mnior version for pre-checksum files
private static int MINOR_VERSION = 0;
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private HFileSystem fs;
private int uncompressedSizeV1;
private final boolean includesMemstoreTS;
private final boolean includesTag;
@ -109,7 +103,6 @@ public class TestHFileBlockCompatibility {
DataOutputStream dos = new DataOutputStream(os);
BlockType.META.write(dos); // Let's make this a meta block.
TestHFileBlock.writeTestBlockContents(dos);
uncompressedSizeV1 = dos.size();
dos.flush();
algo.returnCompressor(compressor);
return baos.toByteArray();
@ -259,8 +252,8 @@ public class TestHFileBlockCompatibility {
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo + "_" + encoding.toString());
FSDataOutputStream os = fs.create(path);
HFileDataBlockEncoder dataBlockEncoder =
new HFileDataBlockEncoderImpl(encoding);
HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
TestHFileBlockCompatibility.Writer hbw =
new TestHFileBlockCompatibility.Writer(algo,
dataBlockEncoder, includesMemstoreTS, includesTag);
@ -268,12 +261,25 @@ public class TestHFileBlockCompatibility {
final List<Integer> encodedSizes = new ArrayList<Integer>();
final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
for (int blockId = 0; blockId < numBlocks; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes,
encodedBlocks, blockId, includesMemstoreTS,
TestHFileBlockCompatibility.Writer.DUMMY_HEADER, includesTag);
hbw.startWriting(BlockType.DATA);
TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag);
hbw.writeHeaderAndData(os);
int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader();
final int encodedSize = encodedResultWithHeader.length - headerLen;
if (encoding != DataBlockEncoding.NONE) {
// We need to account for the two-byte encoding algorithm ID that
// comes after the 24-byte block header but before encoded KVs.
headerLen += DataBlockEncoding.ID_SIZE;
}
byte[] encodedDataSection =
new byte[encodedResultWithHeader.length - headerLen];
System.arraycopy(encodedResultWithHeader, headerLen,
encodedDataSection, 0, encodedDataSection.length);
final ByteBuffer encodedBuf =
ByteBuffer.wrap(encodedDataSection);
encodedSizes.add(encodedSize);
encodedBlocks.add(encodedBuf);
totalSize += hbw.getOnDiskSizeWithHeader();
}
os.close();
@ -329,7 +335,7 @@ public class TestHFileBlockCompatibility {
* in this class but the code in HFileBlock.Writer will continually
* evolve.
*/
public static final class Writer {
public static final class Writer extends HFileBlock.Writer{
// These constants are as they were in minorVersion 0.
private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
@ -408,34 +414,31 @@ public class TestHFileBlockCompatibility {
/** The offset of the previous block of the same type */
private long prevOffset;
private HFileContext meta;
private int unencodedDataSizeWritten;
/**
* @param compressionAlgorithm compression algorithm to use
* @param dataBlockEncoderAlgo data block encoding algorithm to use
*/
public Writer(Compression.Algorithm compressionAlgorithm,
HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
this.dataBlockEncoder = dataBlockEncoder != null
? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
.withCompression(compressionAlgorithm).build());
}
meta = new HFileContextBuilder()
.withHBaseCheckSum(false)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTag)
.withCompression(compressionAlgorithm)
.build();
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) {
super(dataBlockEncoder, meta);
compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression();
this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder
: NoOpDataBlockEncoder.INSTANCE;
defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
dataBlockEncodingCtx =
this.dataBlockEncoder.newDataBlockEncodingContext(
DUMMY_HEADER, meta);
dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta);
baosInMemory = new ByteArrayOutputStream();
prevOffsetByType = new long[BlockType.values().length];
for (int i = 0; i < prevOffsetByType.length; ++i)
prevOffsetByType[i] = -1;
}
/**
@ -462,9 +465,22 @@ public class TestHFileBlockCompatibility {
// We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory);
if (newBlockType == BlockType.DATA) {
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
}
this.unencodedDataSizeWritten = 0;
return userDataStream;
}
public void write(KeyValue kv) throws IOException{
expectState(State.WRITING);
this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream);
this.unencodedDataSizeWritten += kv.getLength();
if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) {
this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getMvccVersion());
}
}
/**
* Returns the stream for the user to write to. The block writer takes care
* of handling compression and buffering for caching on write. Can only be
@ -481,7 +497,7 @@ public class TestHFileBlockCompatibility {
* Transitions the block writer from the "writing" state to the "block
* ready" state. Does nothing if a block is already finished.
*/
private void ensureBlockReady() throws IOException {
void ensureBlockReady() throws IOException {
Preconditions.checkState(state != State.INIT,
"Unexpected state: " + state);
@ -498,7 +514,12 @@ public class TestHFileBlockCompatibility {
* uncompressed stream for caching on write, if applicable. Sets block
* write state to "block ready".
*/
private void finishBlock() throws IOException {
void finishBlock() throws IOException {
if (blockType == BlockType.DATA) {
this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
baosInMemory.toByteArray(), blockType);
blockType = dataBlockEncodingCtx.getBlockType();
}
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array.
uncompressedBytesWithHeader = baosInMemory.toByteArray();
@ -508,13 +529,12 @@ public class TestHFileBlockCompatibility {
// cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed.
state = State.BLOCK_READY;
if (blockType == BlockType.DATA) {
encodeDataBlockForDisk();
if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
onDiskBytesWithHeader = dataBlockEncodingCtx
.compressAndEncrypt(uncompressedBytesWithHeader);
} else {
defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
uncompressedBytesWithHeader, blockType);
onDiskBytesWithHeader =
defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
onDiskBytesWithHeader = defaultBlockEncodingCtx
.compressAndEncrypt(uncompressedBytesWithHeader);
}
// put the header for on disk bytes
@ -527,26 +547,6 @@ public class TestHFileBlockCompatibility {
uncompressedBytesWithHeader.length);
}
/**
* Encodes this block if it is a data block and encoding is turned on in
* {@link #dataBlockEncoder}.
*/
private void encodeDataBlockForDisk() throws IOException {
// do data block encoding, if data block encoder is set
ByteBuffer rawKeyValues =
ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
//do the encoding
dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
uncompressedBytesWithHeader =
dataBlockEncodingCtx.getUncompressedBytesWithHeader();
onDiskBytesWithHeader =
dataBlockEncodingCtx.getOnDiskBytesWithHeader();
blockType = dataBlockEncodingCtx.getBlockType();
}
/**
* Put the header into the given byte array at the given offset.
* @param onDiskSize size of the block on disk
@ -676,7 +676,7 @@ public class TestHFileBlockCompatibility {
public int blockSizeWritten() {
if (state != State.WRITING)
return 0;
return userDataStream.size();
return this.unencodedDataSizeWritten;
}
/**

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -26,12 +28,14 @@ import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.test.RedundantKVGenerator;
import org.junit.Test;
@ -43,7 +47,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category(SmallTests.class)
public class TestHFileDataBlockEncoder {
private HFileDataBlockEncoderImpl blockEncoder;
private HFileDataBlockEncoder blockEncoder;
private RedundantKVGenerator generator = new RedundantKVGenerator();
private boolean includesMemstoreTS;
@ -51,7 +55,7 @@ public class TestHFileDataBlockEncoder {
* Create test for given data block encoding configuration.
* @param blockEncoder What kind of encoding policy will be used.
*/
public TestHFileDataBlockEncoder(HFileDataBlockEncoderImpl blockEncoder,
public TestHFileDataBlockEncoder(HFileDataBlockEncoder blockEncoder,
boolean includesMemstoreTS) {
this.blockEncoder = blockEncoder;
this.includesMemstoreTS = includesMemstoreTS;
@ -70,8 +74,9 @@ public class TestHFileDataBlockEncoder {
}
private void testEncodingWithCacheInternals(boolean useTag) throws IOException {
HFileBlock block = getSampleHFileBlock(useTag);
HFileBlock cacheBlock = createBlockOnDisk(block, useTag);
List<KeyValue> kvs = generator.generateTestKeyValues(60, useTag);
HFileBlock block = getSampleHFileBlock(kvs, useTag);
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTag);
LruBlockCache blockCache =
new LruBlockCache(8 * 1024 * 1024, 32 * 1024);
@ -105,8 +110,8 @@ public class TestHFileDataBlockEncoder {
private void testHeaderSizeInCacheWithoutChecksumInternals(boolean useTags) throws IOException {
int headerSize = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
// Create some KVs and create the block with old-style header.
ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(
generator.generateTestKeyValues(60, useTags), includesMemstoreTS);
List<KeyValue> kvs = generator.generateTestKeyValues(60, useTags);
ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(kvs, includesMemstoreTS);
int size = keyValues.limit();
ByteBuffer buf = ByteBuffer.allocate(size + headerSize);
buf.position(headerSize);
@ -121,24 +126,10 @@ public class TestHFileDataBlockEncoder {
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, 0,
0, hfileContext);
HFileBlock cacheBlock = createBlockOnDisk(block, useTags);
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
}
private HFileBlock createBlockOnDisk(HFileBlock block, boolean useTags) throws IOException {
int size;
HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext(
blockEncoder.getDataBlockEncoding(),
HConstants.HFILEBLOCK_DUMMY_HEADER, block.getHFileContext());
context.setDummyHeader(block.getDummyHeaderForVersion());
blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(), context, block.getBlockType());
byte[] encodedBytes = context.getUncompressedBytesWithHeader();
size = encodedBytes.length - block.getDummyHeaderForVersion().length;
return new HFileBlock(context.getBlockType(), size, size, -1,
ByteBuffer.wrap(encodedBytes), HFileBlock.FILL_HEADER, 0,
block.getOnDiskDataSizeWithHeader(), block.getHFileContext());
}
/**
* Test encoding.
* @throws IOException
@ -151,8 +142,9 @@ public class TestHFileDataBlockEncoder {
private void testEncodingInternals(boolean useTag) throws IOException {
// usually we have just block without headers, but don't complicate that
HFileBlock block = getSampleHFileBlock(useTag);
HFileBlock blockOnDisk = createBlockOnDisk(block, useTag);
List<KeyValue> kvs = generator.generateTestKeyValues(60, useTag);
HFileBlock block = getSampleHFileBlock(kvs, useTag);
HFileBlock blockOnDisk = createBlockOnDisk(kvs, block, useTag);
if (blockEncoder.getDataBlockEncoding() !=
DataBlockEncoding.NONE) {
@ -164,9 +156,8 @@ public class TestHFileDataBlockEncoder {
}
}
private HFileBlock getSampleHFileBlock(boolean useTag) {
ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(
generator.generateTestKeyValues(60, useTag), includesMemstoreTS);
private HFileBlock getSampleHFileBlock(List<KeyValue> kvs, boolean useTag) {
ByteBuffer keyValues = RedundantKVGenerator.convertKvToByteBuffer(kvs, includesMemstoreTS);
int size = keyValues.limit();
ByteBuffer buf = ByteBuffer.allocate(size + HConstants.HFILEBLOCK_HEADER_SIZE);
buf.position(HConstants.HFILEBLOCK_HEADER_SIZE);
@ -186,6 +177,29 @@ public class TestHFileDataBlockEncoder {
return b;
}
private HFileBlock createBlockOnDisk(List<KeyValue> kvs, HFileBlock block, boolean useTags)
throws IOException {
int size;
HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext(
blockEncoder.getDataBlockEncoding(), HConstants.HFILEBLOCK_DUMMY_HEADER,
block.getHFileContext());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(block.getDummyHeaderForVersion());
DataOutputStream dos = new DataOutputStream(baos);
blockEncoder.startBlockEncoding(context, dos);
for (KeyValue kv : kvs) {
blockEncoder.encode(kv, context, dos);
}
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
baos.writeTo(stream);
blockEncoder.endBlockEncoding(context, dos, stream.getBuffer(), BlockType.DATA);
byte[] encodedBytes = baos.toByteArray();
size = encodedBytes.length - block.getDummyHeaderForVersion().length;
return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext());
}
/**
* @return All possible data block encoding configurations
*/
@ -195,10 +209,10 @@ public class TestHFileDataBlockEncoder {
new ArrayList<Object[]>();
for (DataBlockEncoding diskAlgo : DataBlockEncoding.values()) {
for (boolean includesMemstoreTS : new boolean[] {false, true}) {
configurations.add(new Object[] {
new HFileDataBlockEncoderImpl(diskAlgo),
new Boolean(includesMemstoreTS)});
for (boolean includesMemstoreTS : new boolean[] { false, true }) {
HFileDataBlockEncoder dbe = (diskAlgo == DataBlockEncoding.NONE) ?
NoOpDataBlockEncoder.INSTANCE : new HFileDataBlockEncoderImpl(diskAlgo);
configurations.add(new Object[] { dbe, new Boolean(includesMemstoreTS) });
}
}