HBASE-5521 [jira] Move compression/decompression to an encoder specific encoding

context

Author: Yongqiang He

Summary:
https://issues.apache.org/jira/browse/HBASE-5521

As part of working on HBASE-5313, we want to add a new columnar encoder/decoder.
It makes sense to move compression to be part of encoder/decoder:
1) a scanner for a columnar encoded block can do lazy decompression to a
specific part of a key value object
2) avoid an extra bytes copy from encoder to hblock-writer.

If there is no encoder specified for a writer, the HBlock.Writer will use a
default compression-context to do something very similar to today's code.

Test Plan: existing unit tests verified by mbautin and tedyu. And no new test
added here since this code is just a preparation for columnar encoder. Will add
testcase later in that diff.

Reviewers: dhruba, tedyu, sc, mbautin

Reviewed By: mbautin

Differential Revision: https://reviews.facebook.net/D2097

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1302602 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbautin 2012-03-19 19:12:19 +00:00
parent 047d209cee
commit 6be64072e8
23 changed files with 1156 additions and 646 deletions

View File

@ -24,6 +24,8 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator; import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
@ -301,4 +303,54 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
} }
} }
@Override
public HFileBlockEncodingContext newDataBlockEncodingContext(
Algorithm compressionAlgorithm,
DataBlockEncoding encoding, byte[] header) {
return new HFileBlockDefaultEncodingContext(
compressionAlgorithm, encoding, header);
}
@Override
public HFileBlockDecodingContext newDataBlockDecodingContext(
Algorithm compressionAlgorithm) {
return new HFileBlockDefaultDecodingContext(compressionAlgorithm);
}
/**
* Compress KeyValues and write them to output buffer.
* @param out Where to write compressed data.
* @param in Source of KeyValue for compression.
* @param includesMemstoreTS true if including memstore timestamp after every
* key-value pair
* @throws IOException If there is an error writing to output stream.
*/
public abstract void internalEncodeKeyValues(DataOutputStream out,
ByteBuffer in, boolean includesMemstoreTS) throws IOException;
@Override
public void compressKeyValues(ByteBuffer in,
boolean includesMemstoreTS,
HFileBlockEncodingContext blkEncodingCtx) throws IOException {
if (!(blkEncodingCtx.getClass().getName().equals(
HFileBlockDefaultEncodingContext.class.getName()))) {
throw new IOException (this.getClass().getName() + " only accepts "
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " +
"encoding context.");
}
HFileBlockDefaultEncodingContext encodingCtx =
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding();
DataOutputStream dataOut =
((HFileBlockDefaultEncodingContext) encodingCtx)
.getOutputStreamForEncoder();
internalEncodeKeyValues(dataOut, in, includesMemstoreTS);
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
} else {
encodingCtx.postEncoding(BlockType.DATA);
}
}
} }

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.io.RawComparator;
@InterfaceAudience.Private @InterfaceAudience.Private
public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
@Override @Override
public void compressKeyValues(DataOutputStream out, public void internalEncodeKeyValues(DataOutputStream out,
ByteBuffer in, boolean includesMemstoreTS) throws IOException { ByteBuffer in, boolean includesMemstoreTS) throws IOException {
in.rewind(); in.rewind();
ByteBufferUtils.putInt(out, in.limit()); ByteBufferUtils.putInt(out, in.limit());
@ -94,4 +94,5 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
} }
}; };
} }
} }

View File

@ -17,12 +17,12 @@
package org.apache.hadoop.hbase.io.encoding; package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
/** /**
@ -34,19 +34,32 @@ import org.apache.hadoop.io.RawComparator;
* <li>knowledge of Key Value format</li> * <li>knowledge of Key Value format</li>
* </ul> * </ul>
* It is designed to work fast enough to be feasible as in memory compression. * It is designed to work fast enough to be feasible as in memory compression.
*
* After encoding, it also optionally compresses the encoded data if a
* compression algorithm is specified in HFileBlockEncodingContext argument of
* {@link #compressKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface DataBlockEncoder { public interface DataBlockEncoder {
/** /**
* Compress KeyValues and write them to output buffer. * Compress KeyValues. It will first encode key value pairs, and then
* @param out Where to write compressed data. * optionally do the compression for the encoded data.
* @param in Source of KeyValue for compression. *
* @param includesMemstoreTS true if including memstore timestamp after every * @param in
* key-value pair * Source of KeyValue for compression.
* @throws IOException If there is an error writing to output stream. * @param includesMemstoreTS
* true if including memstore timestamp after every key-value pair
* @param encodingContext
* the encoding context which will contain encoded uncompressed bytes
* as well as compressed encoded bytes if compression is enabled, and
* also it will reuse resources across multiple calls.
* @throws IOException
* If there is an error writing to output stream.
*/ */
public void compressKeyValues(DataOutputStream out, public void compressKeyValues(
ByteBuffer in, boolean includesMemstoreTS) throws IOException; ByteBuffer in, boolean includesMemstoreTS,
HFileBlockEncodingContext encodingContext) throws IOException;
/** /**
* Uncompress. * Uncompress.
@ -93,6 +106,34 @@ public interface DataBlockEncoder {
public EncodedSeeker createSeeker(RawComparator<byte[]> comparator, public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
boolean includesMemstoreTS); boolean includesMemstoreTS);
/**
* Creates a encoder specific encoding context
*
* @param compressionAlgorithm
* compression algorithm used if the final data needs to be
* compressed
* @param encoding
* encoding strategy used
* @param headerBytes
* header bytes to be written, put a dummy header here if the header
* is unknown
* @return a newly created encoding context
*/
public HFileBlockEncodingContext newDataBlockEncodingContext(
Algorithm compressionAlgorithm, DataBlockEncoding encoding,
byte[] headerBytes);
/**
* Creates an encoder specific decoding context, which will prepare the data
* before actual decoding
*
* @param compressionAlgorithm
* compression algorithm used if the data needs to be decompressed
* @return a newly created decoding context
*/
public HFileBlockDecodingContext newDataBlockDecodingContext(
Algorithm compressionAlgorithm);
/** /**
* An interface which enable to seek while underlying data is encoded. * An interface which enable to seek while underlying data is encoded.
* *

View File

@ -103,6 +103,18 @@ public enum DataBlockEncoding {
stream.write(idInBytes); stream.write(idInBytes);
} }
/**
* Writes id bytes to the given array starting from offset.
*
* @param dest output array
* @param offset starting offset of the output array
* @throws IOException
*/
public void writeIdInBytes(byte[] dest, int offset) throws IOException {
System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE);
}
/** /**
* Return new data block encoder for given algorithm type. * Return new data block encoder for given algorithm type.
* @return data block encoder if algorithm is specified, null if none is * @return data block encoder if algorithm is specified, null if none is
@ -112,25 +124,6 @@ public enum DataBlockEncoding {
return encoder; return encoder;
} }
/**
* Provide access to all data block encoders, even those which are not
* exposed in the enum. Useful for testing and benchmarking.
* @return list of all data block encoders.
*/
public static List<DataBlockEncoder> getAllEncoders() {
ArrayList<DataBlockEncoder> encoders = new ArrayList<DataBlockEncoder>();
for (DataBlockEncoding algo : values()) {
DataBlockEncoder encoder = algo.getEncoder();
if (encoder != null) {
encoders.add(encoder);
}
}
// Add encoders that are only used in testing.
encoders.add(new CopyKeyDataBlockEncoder());
return encoders;
}
/** /**
* Find and create data block encoder for given id; * Find and create data block encoder for given id;
* @param encoderId id of data block encoder. * @param encoderId id of data block encoder.

View File

@ -316,7 +316,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
} }
@Override @Override
public void compressKeyValues(DataOutputStream out, public void internalEncodeKeyValues(DataOutputStream out,
ByteBuffer in, boolean includesMemstoreTS) throws IOException { ByteBuffer in, boolean includesMemstoreTS) throws IOException {
in.rewind(); in.rewind();
ByteBufferUtils.putInt(out, in.limit()); ByteBufferUtils.putInt(out, in.limit());

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.encoding;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
@ -27,6 +26,8 @@ import java.util.Iterator;
import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Compressor;
/** /**
@ -40,17 +41,22 @@ public class EncodedDataBlock {
ByteArrayOutputStream uncompressedOutputStream; ByteArrayOutputStream uncompressedOutputStream;
ByteBuffer uncompressedBuffer; ByteBuffer uncompressedBuffer;
private byte[] cacheCompressData; private byte[] cacheCompressData;
private ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
private boolean includesMemstoreTS; private boolean includesMemstoreTS;
private final HFileBlockEncodingContext encodingCxt;
/** /**
* Create a buffer which will be encoded using dataBlockEncoder. * Create a buffer which will be encoded using dataBlockEncoder.
* @param dataBlockEncoder Algorithm used for compression. * @param dataBlockEncoder Algorithm used for compression.
* @param encoding encoding type used
*/ */
public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, public EncodedDataBlock(DataBlockEncoder dataBlockEncoder,
boolean includesMemstoreTS) { boolean includesMemstoreTS, DataBlockEncoding encoding) {
this.dataBlockEncoder = dataBlockEncoder; this.dataBlockEncoder = dataBlockEncoder;
uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE); uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE);
encodingCxt =
dataBlockEncoder.newDataBlockEncodingContext(Compression.Algorithm.NONE,
encoding, HFileBlock.DUMMY_HEADER);
} }
/** /**
@ -175,7 +181,7 @@ public class EncodedDataBlock {
if (cacheCompressData != null) { if (cacheCompressData != null) {
return cacheCompressData; return cacheCompressData;
} }
cacheCompressData = doCompressData(); cacheCompressData = encodeData();
return cacheCompressData; return cacheCompressData;
} }
@ -190,22 +196,20 @@ public class EncodedDataBlock {
} }
/** /**
* Do the compression. * Do the encoding .
* @return Compressed byte buffer. * @return encoded byte buffer.
*/ */
public byte[] doCompressData() { public byte[] encodeData() {
compressedStream.reset();
DataOutputStream dataOut = new DataOutputStream(compressedStream);
try { try {
this.dataBlockEncoder.compressKeyValues( this.dataBlockEncoder.compressKeyValues(
dataOut, getUncompressedBuffer(), includesMemstoreTS); getUncompressedBuffer(), includesMemstoreTS, encodingCxt);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(String.format( throw new RuntimeException(String.format(
"Bug in decoding part of algorithm %s. " + "Bug in encoding part of algorithm %s. " +
"Probably it requested more bytes than are available.", "Probably it requested more bytes than are available.",
toString()), e); toString()), e);
} }
return compressedStream.toByteArray(); return encodingCxt.getUncompressedBytesWithHeader();
} }
@Override @Override

View File

@ -343,7 +343,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
} }
@Override @Override
public void compressKeyValues(DataOutputStream out, public void internalEncodeKeyValues(DataOutputStream out,
ByteBuffer in, boolean includesMemstoreTS) throws IOException { ByteBuffer in, boolean includesMemstoreTS) throws IOException {
in.rewind(); in.rewind();
ByteBufferUtils.putInt(out, in.limit()); ByteBufferUtils.putInt(out, in.limit());

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.encoding;
import java.io.IOException;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
/**
* A decoding context that is created by a reader's encoder, and is shared
* across the reader's all read operations.
*
* @see HFileBlockEncodingContext for encoding
*/
public interface HFileBlockDecodingContext {
/**
* @return the compression algorithm used by this decoding context
*/
public Compression.Algorithm getCompression();
/**
* Perform all actions that need to be done before the encoder's real
* decoding process. Decompression needs to be done if
* {@link #getCompression()} returns a valid compression algorithm.
*
* @param block HFile block object
* @param onDiskBlock on disk bytes to be decoded
* @param offset data start offset in onDiskBlock
* @throws IOException
*/
public void prepareDecoding(HFileBlock block, byte[] onDiskBlock,
int offset) throws IOException;
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.encoding;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
/**
* A default implementation of {@link HFileBlockDecodingContext}. It assumes the
* block data section is compressed as a whole.
*
* @see HFileBlockDefaultEncodingContext for the default compression context
*
*/
public class HFileBlockDefaultDecodingContext implements
HFileBlockDecodingContext {
private final Compression.Algorithm compressAlgo;
public HFileBlockDefaultDecodingContext(
Compression.Algorithm compressAlgo) {
this.compressAlgo = compressAlgo;
}
@Override
public void prepareDecoding(HFileBlock block,
byte[] onDiskBlock, int offset) throws IOException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(
onDiskBlock, offset,
block.getOnDiskSizeWithoutHeader()));
ByteBuffer buffer = block.getBufferWithoutHeader();
Compression.decompress(buffer.array(), buffer.arrayOffset(),
(InputStream) dis, block.getOnDiskSizeWithoutHeader(),
block.getUncompressedSizeWithoutHeader(), compressAlgo);
}
@Override
public Algorithm getCompression() {
return compressAlgo;
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.encoding;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
/**
* A default implementation of {@link HFileBlockEncodingContext}. It will
* compress the data section as one continuous buffer.
*
* @see HFileBlockDefaultDecodingContext for the decompression part
*
*/
public class HFileBlockDefaultEncodingContext implements
HFileBlockEncodingContext {
private byte[] onDiskBytesWithHeader;
private byte[] uncompressedBytesWithHeader;
private BlockType blockType;
private final DataBlockEncoding encodingAlgo;
/** Compressor, which is also reused between consecutive blocks. */
private Compressor compressor;
/** Compression output stream */
private CompressionOutputStream compressionStream;
/** Underlying stream to write compressed bytes to */
private ByteArrayOutputStream compressedByteStream;
/** Compression algorithm for all blocks this instance writes. */
private final Compression.Algorithm compressionAlgorithm;
private ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
private DataOutputStream dataOut = new DataOutputStream(encodedStream);
private final byte[] dummyHeader;
/**
* @param compressionAlgorithm compression algorithm used
* @param encoding encoding used
* @param headerBytes dummy header bytes
*/
public HFileBlockDefaultEncodingContext(
Compression.Algorithm compressionAlgorithm,
DataBlockEncoding encoding, byte[] headerBytes) {
this.encodingAlgo = encoding;
this.compressionAlgorithm =
compressionAlgorithm == null ? NONE : compressionAlgorithm;
if (this.compressionAlgorithm != NONE) {
compressor = compressionAlgorithm.getCompressor();
compressedByteStream = new ByteArrayOutputStream();
try {
compressionStream =
compressionAlgorithm.createPlainCompressionStream(
compressedByteStream, compressor);
} catch (IOException e) {
throw new RuntimeException(
"Could not create compression stream for algorithm "
+ compressionAlgorithm, e);
}
}
if (headerBytes == null) {
dummyHeader = HFileBlock.DUMMY_HEADER;
} else {
dummyHeader = headerBytes;
}
}
/**
* @param compressionAlgorithm compression algorithm
* @param encoding encoding
*/
public HFileBlockDefaultEncodingContext(
Compression.Algorithm compressionAlgorithm,
DataBlockEncoding encoding) {
this(compressionAlgorithm, encoding, null);
}
/**
* prepare to start a new encoding.
* @throws IOException
*/
void prepareEncoding() throws IOException {
encodedStream.reset();
dataOut.write(dummyHeader);
if (encodingAlgo != null
&& encodingAlgo != DataBlockEncoding.NONE) {
encodingAlgo.writeIdInBytes(dataOut);
}
}
@Override
public void postEncoding(BlockType blockType)
throws IOException {
dataOut.flush();
compressAfterEncoding(encodedStream.toByteArray(), blockType);
this.blockType = blockType;
}
/**
* @param uncompressedBytesWithHeader
* @param blockType
* @throws IOException
*/
public void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
BlockType blockType) throws IOException {
compressAfterEncoding(uncompressedBytesWithHeader, blockType, dummyHeader);
}
/**
* @param uncompressedBytesWithHeader
* @param blockType
* @param headerBytes
* @throws IOException
*/
protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
BlockType blockType, byte[] headerBytes) throws IOException {
this.uncompressedBytesWithHeader = uncompressedBytesWithHeader;
if (compressionAlgorithm != NONE) {
compressedByteStream.reset();
compressedByteStream.write(headerBytes);
compressionStream.resetState();
compressionStream.write(uncompressedBytesWithHeader,
headerBytes.length, uncompressedBytesWithHeader.length
- headerBytes.length);
compressionStream.flush();
compressionStream.finish();
onDiskBytesWithHeader = compressedByteStream.toByteArray();
} else {
onDiskBytesWithHeader = uncompressedBytesWithHeader;
}
this.blockType = blockType;
}
@Override
public byte[] getOnDiskBytesWithHeader() {
return onDiskBytesWithHeader;
}
@Override
public byte[] getUncompressedBytesWithHeader() {
return uncompressedBytesWithHeader;
}
@Override
public BlockType getBlockType() {
return blockType;
}
/**
* Releases the compressor this writer uses to compress blocks into the
* compressor pool.
*/
@Override
public void close() {
if (compressor != null) {
compressionAlgorithm.returnCompressor(compressor);
compressor = null;
}
}
@Override
public Algorithm getCompression() {
return this.compressionAlgorithm;
}
public DataOutputStream getOutputStreamForEncoder() {
return this.dataOut;
}
@Override
public DataBlockEncoding getDataBlockEncoding() {
return this.encodingAlgo;
}
@Override
public int getHeaderSize() {
return this.dummyHeader.length;
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.encoding;
import java.io.IOException;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.Compression;
/**
* An encoding context that is created by a writer's encoder, and is shared
* across the writer's whole lifetime.
*
* @see HFileBlockDecodingContext for decoding
*
*/
public interface HFileBlockEncodingContext {
/**
* @return encoded and compressed bytes with header which are ready to write
* out to disk
*/
public byte[] getOnDiskBytesWithHeader();
/**
* @return encoded but not heavily compressed bytes with header which can be
* cached in block cache
*/
public byte[] getUncompressedBytesWithHeader();
/**
* @return the block type after encoding
*/
public BlockType getBlockType();
/**
* @return the compression algorithm used by this encoding context
*/
public Compression.Algorithm getCompression();
/**
* @return the header size used
*/
public int getHeaderSize();
/**
* @return the {@link DataBlockEncoding} encoding used
*/
public DataBlockEncoding getDataBlockEncoding();
/**
* Do any action that needs to be performed after the encoding.
* Compression is also included if {@link #getCompression()} returns non-null
* compression algorithm
*
* @param blockType
* @throws IOException
*/
public void postEncoding(BlockType blockType) throws IOException;
/**
* Releases the resources used.
*/
public void close();
}

View File

@ -75,7 +75,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
} }
@Override @Override
public void compressKeyValues(DataOutputStream writeHere, public void internalEncodeKeyValues(DataOutputStream writeHere,
ByteBuffer in, boolean includesMemstoreTS) throws IOException { ByteBuffer in, boolean includesMemstoreTS) throws IOException {
in.rewind(); in.rewind();
ByteBufferUtils.putInt(writeHere, in.limit()); ByteBufferUtils.putInt(writeHere, in.limit());

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionInputStream;
@ -234,7 +235,7 @@ public final class Compression {
* Creates a compression stream without any additional wrapping into * Creates a compression stream without any additional wrapping into
* buffering streams. * buffering streams.
*/ */
CompressionOutputStream createPlainCompressionStream( public CompressionOutputStream createPlainCompressionStream(
OutputStream downStream, Compressor compressor) throws IOException { OutputStream downStream, Compressor compressor) throws IOException {
CompressionCodec codec = getCodec(conf); CompressionCodec codec = getCodec(conf);
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
@ -323,4 +324,52 @@ public final class Compression {
return ret; return ret;
} }
/**
* Decompresses data from the given stream using the configured compression
* algorithm. It will throw an exception if the dest buffer does not have
* enough space to hold the decompressed data.
*
* @param dest
* the output bytes buffer
* @param destOffset
* start writing position of the output buffer
* @param bufferedBoundedStream
* a stream to read compressed data from, bounded to the exact amount
* of compressed data
* @param compressedSize
* compressed data size, header not included
* @param uncompressedSize
* uncompressed data size, header not included
* @param compressAlgo
* compression algorithm used
* @throws IOException
*/
public static void decompress(byte[] dest, int destOffset,
InputStream bufferedBoundedStream, int compressedSize,
int uncompressedSize, Compression.Algorithm compressAlgo)
throws IOException {
if (dest.length - destOffset < uncompressedSize) {
throw new IllegalArgumentException(
"Output buffer does not have enough space to hold "
+ uncompressedSize + " decompressed bytes, available: "
+ (dest.length - destOffset));
}
Decompressor decompressor = null;
try {
decompressor = compressAlgo.getDecompressor();
InputStream is = compressAlgo.createDecompressionStream(
bufferedBoundedStream, decompressor, 0);
IOUtils.readFully(is, dest, destOffset, uncompressedSize);
is.close();
} finally {
if (decompressor != null) {
compressAlgo.returnDecompressor(decompressor);
}
}
}
} }

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH; import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -29,7 +28,6 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -38,6 +36,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.regionserver.MemStore; import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
@ -45,14 +47,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CompoundBloomFilter; import org.apache.hadoop.hbase.util.CompoundBloomFilter;
import org.apache.hadoop.hbase.util.ChecksumFactory;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -114,8 +111,8 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
* There is a 1 byte checksum type, followed by a 4 byte bytesPerChecksum * There is a 1 byte checksum type, followed by a 4 byte bytesPerChecksum
* followed by another 4 byte value to store sizeofDataOnDisk. * followed by another 4 byte value to store sizeofDataOnDisk.
*/ */
static final int HEADER_SIZE = HEADER_SIZE_NO_CHECKSUM + Bytes.SIZEOF_BYTE + public static final int HEADER_SIZE = HEADER_SIZE_NO_CHECKSUM +
2 * Bytes.SIZEOF_INT; Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT;
/** /**
* The size of block header when blockType is {@link BlockType#ENCODED_DATA}. * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
@ -125,7 +122,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
+ DataBlockEncoding.ID_SIZE; + DataBlockEncoding.ID_SIZE;
/** Just an array of bytes of the right size. */ /** Just an array of bytes of the right size. */
static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE]; public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
static final byte[] DUMMY_HEADER_NO_CHECKSUM = static final byte[] DUMMY_HEADER_NO_CHECKSUM =
new byte[HEADER_SIZE_NO_CHECKSUM]; new byte[HEADER_SIZE_NO_CHECKSUM];
@ -303,7 +300,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
* @return the on-disk size of the data part of the block, header and * @return the on-disk size of the data part of the block, header and
* checksum not included. * checksum not included.
*/ */
int getOnDiskSizeWithoutHeader() { public int getOnDiskSizeWithoutHeader() {
return onDiskSizeWithoutHeader; return onDiskSizeWithoutHeader;
} }
@ -342,7 +339,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
* *
* @return the buffer with header skipped * @return the buffer with header skipped
*/ */
ByteBuffer getBufferWithoutHeader() { public ByteBuffer getBufferWithoutHeader() {
return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(), return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
buf.limit() - headerSize() - totalChecksumBytes()).slice(); buf.limit() - headerSize() - totalChecksumBytes()).slice();
} }
@ -644,6 +641,11 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
/** Data block encoder used for data blocks */ /** Data block encoder used for data blocks */
private final HFileDataBlockEncoder dataBlockEncoder; private final HFileDataBlockEncoder dataBlockEncoder;
private HFileBlockEncodingContext dataBlockEncodingCtx;
/** block encoding context for non-data blocks */
private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
/** /**
* The stream we use to accumulate data in uncompressed format for each * The stream we use to accumulate data in uncompressed format for each
* block. We reset this stream at the end of each block and reuse it. The * block. We reset this stream at the end of each block and reuse it. The
@ -652,15 +654,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
*/ */
private ByteArrayOutputStream baosInMemory; private ByteArrayOutputStream baosInMemory;
/** Compressor, which is also reused between consecutive blocks. */
private Compressor compressor;
/** Compression output stream */
private CompressionOutputStream compressionStream;
/** Underlying stream to write compressed bytes to */
private ByteArrayOutputStream compressedByteStream;
/** /**
* Current block type. Set in {@link #startWriting(BlockType)}. Could be * Current block type. Set in {@link #startWriting(BlockType)}. Could be
* changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
@ -681,12 +674,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
*/ */
private byte[] onDiskBytesWithHeader; private byte[] onDiskBytesWithHeader;
/**
* The size of the data on disk that does not include the checksums.
* (header + data)
*/
private int onDiskDataSizeWithHeader;
/** /**
* The size of the checksum data on disk. It is used only if data is * The size of the checksum data on disk. It is used only if data is
* not compressed. If data is compressed, then the checksums are already * not compressed. If data is compressed, then the checksums are already
@ -734,28 +721,23 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
public Writer(Compression.Algorithm compressionAlgorithm, public Writer(Compression.Algorithm compressionAlgorithm,
HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS,
ChecksumType checksumType, int bytesPerChecksum) { ChecksumType checksumType, int bytesPerChecksum) {
compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm; compressAlgo = compressionAlgorithm == null ? Compression.Algorithm.NONE :
compressionAlgorithm;
this.dataBlockEncoder = dataBlockEncoder != null this.dataBlockEncoder = dataBlockEncoder != null
? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
defaultBlockEncodingCtx =
new HFileBlockDefaultEncodingContext(compressionAlgorithm, null);
dataBlockEncodingCtx =
this.dataBlockEncoder.newOnDiskDataBlockEncodingContext(
compressionAlgorithm, DUMMY_HEADER);
baosInMemory = new ByteArrayOutputStream();
if (compressAlgo != NONE) {
compressor = compressionAlgorithm.getCompressor();
compressedByteStream = new ByteArrayOutputStream();
try {
compressionStream =
compressionAlgorithm.createPlainCompressionStream(
compressedByteStream, compressor);
} catch (IOException e) {
throw new RuntimeException("Could not create compression stream " +
"for algorithm " + compressionAlgorithm, e);
}
}
if (bytesPerChecksum < HEADER_SIZE) { if (bytesPerChecksum < HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " + throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
" Minimum is " + HEADER_SIZE + " but the configured value is " + " Minimum is " + HEADER_SIZE + " but the configured value is " +
bytesPerChecksum); bytesPerChecksum);
} }
baosInMemory = new ByteArrayOutputStream();
prevOffsetByType = new long[BlockType.values().length]; prevOffsetByType = new long[BlockType.values().length];
for (int i = 0; i < prevOffsetByType.length; ++i) for (int i = 0; i < prevOffsetByType.length; ++i)
@ -828,7 +810,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
*/ */
private void finishBlock() throws IOException { private void finishBlock() throws IOException {
userDataStream.flush(); userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array. // This does an array copy, so it is safe to cache this byte array.
uncompressedBytesWithHeader = baosInMemory.toByteArray(); uncompressedBytesWithHeader = baosInMemory.toByteArray();
prevOffset = prevOffsetByType[blockType.getId()]; prevOffset = prevOffsetByType[blockType.getId()];
@ -837,81 +818,32 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
// cache-on-write. In a way, the block is ready, but not yet encoded or // cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed. // compressed.
state = State.BLOCK_READY; state = State.BLOCK_READY;
encodeDataBlockForDisk(); if (blockType == BlockType.DATA) {
encodeDataBlockForDisk();
doCompressionAndChecksumming();
}
/**
* Do compression if it is enabled, or re-use the uncompressed buffer if
* it is not. Fills in the compressed block's header if doing compression.
* Also, compute the checksums. In the case of no-compression, write the
* checksums to its own seperate data structure called onDiskChecksum. In
* the case when compression is enabled, the checksums are written to the
* outputbyte stream 'baos'.
*/
private void doCompressionAndChecksumming() throws IOException {
// do the compression
if (compressAlgo != NONE) {
compressedByteStream.reset();
compressedByteStream.write(DUMMY_HEADER);
compressionStream.resetState();
compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
uncompressedBytesWithHeader.length - HEADER_SIZE);
compressionStream.flush();
compressionStream.finish();
// generate checksums
onDiskDataSizeWithHeader = compressedByteStream.size(); // data size
// reserve space for checksums in the output byte stream
ChecksumUtil.reserveSpaceForChecksums(compressedByteStream,
onDiskDataSizeWithHeader, bytesPerChecksum);
onDiskBytesWithHeader = compressedByteStream.toByteArray();
putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
// generate checksums for header and data. The checksums are
// part of onDiskBytesWithHeader itself.
ChecksumUtil.generateChecksums(
onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,
onDiskBytesWithHeader, onDiskDataSizeWithHeader,
checksumType, bytesPerChecksum);
// Checksums are already part of onDiskBytesWithHeader
onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
//set the header for the uncompressed bytes (for cache-on-write)
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + onDiskChecksum.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
} else { } else {
// If we are not using any compression, then the defaultBlockEncodingCtx.compressAfterEncoding(
// checksums are written to its own array onDiskChecksum. uncompressedBytesWithHeader, blockType);
onDiskBytesWithHeader = uncompressedBytesWithHeader; onDiskBytesWithHeader =
defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
int numBytes = (int)ChecksumUtil.numBytes(
uncompressedBytesWithHeader.length,
bytesPerChecksum);
onDiskChecksum = new byte[numBytes];
//set the header for the uncompressed bytes
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + onDiskChecksum.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
ChecksumUtil.generateChecksums(
uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,
onDiskChecksum, 0,
checksumType, bytesPerChecksum);
} }
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBytesWithHeader.length,
bytesPerChecksum);
// put the header for on disk bytes
putHeader(onDiskBytesWithHeader, 0,
onDiskBytesWithHeader.length + numBytes,
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
//set the header for the uncompressed bytes (for cache-on-write)
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + numBytes,
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
onDiskChecksum = new byte[numBytes];
ChecksumUtil.generateChecksums(
onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
onDiskChecksum, 0, checksumType, bytesPerChecksum);
} }
/** /**
@ -919,35 +851,20 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
* {@link #dataBlockEncoder}. * {@link #dataBlockEncoder}.
*/ */
private void encodeDataBlockForDisk() throws IOException { private void encodeDataBlockForDisk() throws IOException {
if (blockType != BlockType.DATA) {
return; // skip any non-data block
}
// do data block encoding, if data block encoder is set // do data block encoding, if data block encoder is set
ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader, ByteBuffer rawKeyValues =
HEADER_SIZE, uncompressedBytesWithHeader.length - ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
HEADER_SIZE).slice(); uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
Pair<ByteBuffer, BlockType> encodingResult =
dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
includesMemstoreTS, DUMMY_HEADER);
BlockType encodedBlockType = encodingResult.getSecond(); //do the encoding
if (encodedBlockType == BlockType.ENCODED_DATA) { dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
uncompressedBytesWithHeader = encodingResult.getFirst().array(); includesMemstoreTS, dataBlockEncodingCtx, blockType);
blockType = BlockType.ENCODED_DATA;
} else { uncompressedBytesWithHeader =
// There is no encoding configured. Do some extra sanity-checking. dataBlockEncodingCtx.getUncompressedBytesWithHeader();
if (encodedBlockType != BlockType.DATA) { onDiskBytesWithHeader =
throw new IOException("Unexpected block type coming out of data " + dataBlockEncodingCtx.getOnDiskBytesWithHeader();
"block encoder: " + encodedBlockType); blockType = dataBlockEncodingCtx.getBlockType();
}
if (userDataStream.size() !=
uncompressedBytesWithHeader.length - HEADER_SIZE) {
throw new IOException("Uncompressed size mismatch: "
+ userDataStream.size() + " vs. "
+ (uncompressedBytesWithHeader.length - HEADER_SIZE));
}
}
} }
/** /**
@ -966,7 +883,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
offset = Bytes.putLong(dest, offset, prevOffset); offset = Bytes.putLong(dest, offset, prevOffset);
offset = Bytes.putByte(dest, offset, checksumType.getCode()); offset = Bytes.putByte(dest, offset, checksumType.getCode());
offset = Bytes.putInt(dest, offset, bytesPerChecksum); offset = Bytes.putInt(dest, offset, bytesPerChecksum);
offset = Bytes.putInt(dest, offset, onDiskDataSizeWithHeader); offset = Bytes.putInt(dest, offset, onDiskDataSize);
} }
/** /**
@ -986,7 +903,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
} }
startOffset = offset; startOffset = offset;
writeHeaderAndData((DataOutputStream) out); finishBlockAndWriteHeaderAndData((DataOutputStream) out);
} }
/** /**
@ -998,17 +915,11 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
* @param out the output stream to write the * @param out the output stream to write the
* @throws IOException * @throws IOException
*/ */
private void writeHeaderAndData(DataOutputStream out) throws IOException { private void finishBlockAndWriteHeaderAndData(DataOutputStream out)
throws IOException {
ensureBlockReady(); ensureBlockReady();
out.write(onDiskBytesWithHeader); out.write(onDiskBytesWithHeader);
if (compressAlgo == NONE) { out.write(onDiskChecksum);
if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
throw new IOException("A " + blockType
+ " without compression should have checksums "
+ " stored separately.");
}
out.write(onDiskChecksum);
}
} }
/** /**
@ -1023,34 +934,29 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
*/ */
byte[] getHeaderAndDataForTest() throws IOException { byte[] getHeaderAndDataForTest() throws IOException {
ensureBlockReady(); ensureBlockReady();
if (compressAlgo == NONE) { // This is not very optimal, because we are doing an extra copy.
if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) { // But this method is used only by unit tests.
throw new IOException("A " + blockType byte[] output =
+ " without compression should have checksums " new byte[onDiskBytesWithHeader.length
+ " stored separately."); + onDiskChecksum.length];
} System.arraycopy(onDiskBytesWithHeader, 0, output, 0,
// This is not very optimal, because we are doing an extra copy. onDiskBytesWithHeader.length);
// But this method is used only by unit tests. System.arraycopy(onDiskChecksum, 0, output,
byte[] output = new byte[onDiskBytesWithHeader.length + onDiskBytesWithHeader.length, onDiskChecksum.length);
onDiskChecksum.length]; return output;
System.arraycopy(onDiskBytesWithHeader, 0,
output, 0, onDiskBytesWithHeader.length);
System.arraycopy(onDiskChecksum, 0,
output, onDiskBytesWithHeader.length,
onDiskChecksum.length);
return output;
}
return onDiskBytesWithHeader;
} }
/** /**
* Releases the compressor this writer uses to compress blocks into the * Releases resources used by this writer.
* compressor pool. Needs to be called before the writer is discarded.
*/ */
public void releaseCompressor() { public void release() {
if (compressor != null) { if (dataBlockEncodingCtx != null) {
compressAlgo.returnCompressor(compressor); dataBlockEncodingCtx.close();
compressor = null; dataBlockEncodingCtx = null;
}
if (defaultBlockEncodingCtx != null) {
defaultBlockEncodingCtx.close();
defaultBlockEncodingCtx = null;
} }
} }
@ -1252,7 +1158,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
private int minorVersion; private int minorVersion;
/** The size of the header */ /** The size of the header */
protected int hdrSize; protected final int hdrSize;
/** The filesystem used to access data */ /** The filesystem used to access data */
protected HFileSystem hfs; protected HFileSystem hfs;
@ -1376,36 +1282,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
hdrSize; hdrSize;
} }
/**
* Decompresses data from the given stream using the configured compression
* algorithm.
* @param dest
* @param destOffset
* @param bufferedBoundedStream
* a stream to read compressed data from, bounded to the exact
* amount of compressed data
* @param uncompressedSize
* uncompressed data size, header not included
* @throws IOException
*/
protected void decompress(byte[] dest, int destOffset,
InputStream bufferedBoundedStream,
int uncompressedSize) throws IOException {
Decompressor decompressor = null;
try {
decompressor = compressAlgo.getDecompressor();
InputStream is = compressAlgo.createDecompressionStream(
bufferedBoundedStream, decompressor, 0);
IOUtils.readFully(is, dest, destOffset, uncompressedSize);
is.close();
} finally {
if (decompressor != null) {
compressAlgo.returnDecompressor(decompressor);
}
}
}
/** /**
* Creates a buffered stream reading a certain slice of the file system * Creates a buffered stream reading a certain slice of the file system
* input stream. We need this because the decompression we use seems to * input stream. We need this because the decompression we use seems to
@ -1511,8 +1387,9 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
} else { } else {
InputStream bufferedBoundedStream = createBufferedBoundedStream( InputStream bufferedBoundedStream = createBufferedBoundedStream(
offset, onDiskSize, pread); offset, onDiskSize, pread);
decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA, Compression.decompress(buf.array(), buf.arrayOffset()
bufferedBoundedStream, uncompressedSizeWithMagic); + HEADER_DELTA, bufferedBoundedStream, onDiskSize,
uncompressedSizeWithMagic, this.compressAlgo);
// We don't really have a good way to exclude the "magic record" size // We don't really have a good way to exclude the "magic record" size
// from the compressed block's size, since it is compressed as well. // from the compressed block's size, since it is compressed as well.
@ -1566,6 +1443,10 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
protected HFileDataBlockEncoder dataBlockEncoder = protected HFileDataBlockEncoder dataBlockEncoder =
NoOpDataBlockEncoder.INSTANCE; NoOpDataBlockEncoder.INSTANCE;
private HFileBlockDecodingContext encodedBlockDecodingCtx;
private HFileBlockDefaultDecodingContext defaultDecodingCtx;
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread = private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
new ThreadLocal<PrefetchedHeader>() { new ThreadLocal<PrefetchedHeader>() {
@Override @Override
@ -1598,6 +1479,10 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
useHBaseChecksum = false; useHBaseChecksum = false;
} }
this.useHBaseChecksumConfigured = useHBaseChecksum; this.useHBaseChecksumConfigured = useHBaseChecksum;
defaultDecodingCtx =
new HFileBlockDefaultDecodingContext(compressAlgo);
encodedBlockDecodingCtx =
new HFileBlockDefaultDecodingContext(compressAlgo);
} }
/** /**
@ -1716,9 +1601,8 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
* @return the HFileBlock or null if there is a HBase checksum mismatch * @return the HFileBlock or null if there is a HBase checksum mismatch
*/ */
private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread,
int uncompressedSize, boolean pread, boolean verifyChecksum) boolean verifyChecksum) throws IOException {
throws IOException {
if (offset < 0) { if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read " throw new IOException("Invalid offset=" + offset + " trying to read "
+ "block (onDiskSize=" + onDiskSizeWithHeaderL + "block (onDiskSize=" + onDiskSizeWithHeaderL
@ -1738,8 +1622,20 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
} }
int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL; int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
// See if we can avoid reading the header. This is desirable, because
// we will not incur a backward seek operation if we have already
// read this block's header as part of the previous read's look-ahead.
// And we also want to skip reading the header again if it has already
// been read.
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
prefetchedHeader.buf : null;
HFileBlock b; int nextBlockOnDiskSize = 0;
// Allocate enough space to fit the next block's header too.
byte[] onDiskBlock = null;
HFileBlock b = null;
if (onDiskSizeWithHeader > 0) { if (onDiskSizeWithHeader > 0) {
// We know the total on-disk size but not the uncompressed size. Read // We know the total on-disk size but not the uncompressed size. Read
// the entire block into memory, then parse the header and decompress // the entire block into memory, then parse the header and decompress
@ -1749,172 +1645,117 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
// block's header (e.g. this block's header) when reading the previous // block's header (e.g. this block's header) when reading the previous
// block. This is the faster and more preferable case. // block. This is the faster and more preferable case.
// Size that we have to skip in case we have already read the header.
int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
true, offset + preReadHeaderSize, pread);
if (headerBuf != null) {
// the header has been read when reading the previous block, copy
// to this block's header
System.arraycopy(headerBuf.array(),
headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
} else {
headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
}
// We know the total on-disk size but not the uncompressed size. Read
// the entire block into memory, then parse the header and decompress
// from memory if using compression. Here we have already read the
// block's header
try {
b = new HFileBlock(headerBuf, getMinorVersion());
} catch (IOException ex) {
// Seen in load testing. Provide comprehensive debug info.
throw new IOException("Failed to read compressed block at "
+ offset
+ ", onDiskSizeWithoutHeader="
+ onDiskSizeWithHeader
+ ", preReadHeaderSize="
+ hdrSize
+ ", header.length="
+ prefetchedHeader.header.length
+ ", header bytes: "
+ Bytes.toStringBinary(prefetchedHeader.header, 0,
hdrSize), ex);
}
// if the caller specifies a onDiskSizeWithHeader, validate it.
int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize; int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize;
assert onDiskSizeWithoutHeader >= 0; assert onDiskSizeWithoutHeader >= 0;
b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
// See if we can avoid reading the header. This is desirable, because
// we will not incur a seek operation to seek back if we have already
// read this block's header as part of the previous read's look-ahead.
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
byte[] header = prefetchedHeader.offset == offset
? prefetchedHeader.header : null;
// Size that we have to skip in case we have already read the header.
int preReadHeaderSize = header == null ? 0 : hdrSize;
if (compressAlgo == Compression.Algorithm.NONE) {
// Just read the whole thing. Allocate enough space to read the
// next block's header too.
ByteBuffer headerAndData = ByteBuffer.allocate(onDiskSizeWithHeader
+ hdrSize);
headerAndData.limit(onDiskSizeWithHeader);
if (header != null) {
System.arraycopy(header, 0, headerAndData.array(), 0,
hdrSize);
}
int nextBlockOnDiskSizeWithHeader = readAtOffset(is,
headerAndData.array(), headerAndData.arrayOffset()
+ preReadHeaderSize, onDiskSizeWithHeader
- preReadHeaderSize, true, offset + preReadHeaderSize,
pread);
b = new HFileBlock(headerAndData, getMinorVersion());
b.assumeUncompressed();
b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSizeWithHeader;
if (verifyChecksum &&
!validateBlockChecksum(b, headerAndData.array(), hdrSize)) {
return null; // checksum mismatch
}
if (b.nextBlockOnDiskSizeWithHeader > 0)
setNextBlockHeader(offset, b);
} else {
// Allocate enough space to fit the next block's header too.
byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
true, offset + preReadHeaderSize, pread);
if (header == null)
header = onDiskBlock;
try {
b = new HFileBlock(ByteBuffer.wrap(header, 0, hdrSize),
getMinorVersion());
} catch (IOException ex) {
// Seen in load testing. Provide comprehensive debug info.
throw new IOException("Failed to read compressed block at "
+ offset + ", onDiskSizeWithoutHeader=" + onDiskSizeWithHeader
+ ", preReadHeaderSize=" + preReadHeaderSize
+ ", header.length=" + header.length + ", header bytes: "
+ Bytes.toStringBinary(header, 0, hdrSize), ex);
}
b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
if (verifyChecksum &&
!validateBlockChecksum(b, onDiskBlock, hdrSize)) {
return null; // checksum mismatch
}
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
onDiskBlock, hdrSize, onDiskSizeWithoutHeader));
// This will allocate a new buffer but keep header bytes.
b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0);
decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis,
b.uncompressedSizeWithoutHeader);
// Copy next block's header bytes into the new block if we have them.
if (nextBlockOnDiskSize > 0) {
System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
b.buf.arrayOffset() + hdrSize
+ b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(),
hdrSize);
setNextBlockHeader(offset, b);
}
}
} else { } else {
// We don't know the on-disk size. Read the header first, determine the // Check headerBuf to see if we have read this block's header as part of
// on-disk size from it, and read the remaining data, thereby incurring // reading the previous block. This is an optimization of peeking into
// two read operations. This might happen when we are doing the first // the next block's header (e.g.this block's header) when reading the
// read in a series of reads or a random read, and we don't have access // previous block. This is the faster and more preferable case. If the
// to the block index. This is costly and should happen very rarely. // header is already there, don't read the header again.
// Check if we have read this block's header as part of reading the
// previous block. If so, don't read the header again.
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
prefetchedHeader.buf : null;
// Unfortunately, we still have to do a separate read operation to
// read the header.
if (headerBuf == null) { if (headerBuf == null) {
// Unfortunately, we still have to do a separate read operation to // From the header, determine the on-disk size of the given hfile
// read the header. // block, and read the remaining data, thereby incurring two read
// operations. This might happen when we are doing the first read
// in a series of reads or a random read, and we don't have access
// to the block index. This is costly and should happen very rarely.
headerBuf = ByteBuffer.allocate(hdrSize); headerBuf = ByteBuffer.allocate(hdrSize);
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
false, offset, pread); hdrSize, false, offset, pread);
} }
b = new HFileBlock(headerBuf, getMinorVersion()); b = new HFileBlock(headerBuf, getMinorVersion());
onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
System.arraycopy(headerBuf.array(),
headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
nextBlockOnDiskSize =
readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
- hdrSize, true, offset + hdrSize, pread);
onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
}
// This will also allocate enough room for the next block's header. boolean isCompressed =
b.allocateBuffer(true); compressAlgo != null
&& compressAlgo != Compression.Algorithm.NONE;
if (!isCompressed) {
b.assumeUncompressed();
}
if (compressAlgo == Compression.Algorithm.NONE) { if (verifyChecksum &&
!validateBlockChecksum(b, onDiskBlock, hdrSize)) {
return null; // checksum mismatch
}
// Avoid creating bounded streams and using a "codec" that does if (isCompressed) {
// nothing. // This will allocate a new buffer but keep header bytes.
b.assumeUncompressed(); b.allocateBuffer(nextBlockOnDiskSize > 0);
b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, b.buf.array(), if (b.blockType.equals(BlockType.ENCODED_DATA)) {
b.buf.arrayOffset() + hdrSize, encodedBlockDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize);
b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(),
true, offset + hdrSize,
pread);
if (verifyChecksum &&
!validateBlockChecksum(b, b.buf.array(), hdrSize)) {
return null; // checksum mismatch
}
if (b.nextBlockOnDiskSizeWithHeader > 0) {
setNextBlockHeader(offset, b);
}
} else { } else {
// Allocate enough space for the block's header and compressed data. defaultDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize);
byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader()
+ hdrSize];
b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, compressedBytes,
hdrSize, b.onDiskSizeWithoutHeader, true, offset
+ hdrSize, pread);
if (verifyChecksum &&
!validateBlockChecksum(b, compressedBytes, hdrSize)) {
return null; // checksum mismatch
}
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
compressedBytes, hdrSize, b.onDiskSizeWithoutHeader));
decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis,
b.uncompressedSizeWithoutHeader);
if (b.nextBlockOnDiskSizeWithHeader > 0) {
// Copy the next block's header into the new block.
int nextHeaderOffset = b.buf.arrayOffset() + hdrSize
+ b.uncompressedSizeWithoutHeader + b.totalChecksumBytes();
System.arraycopy(compressedBytes,
compressedBytes.length - hdrSize,
b.buf.array(),
nextHeaderOffset,
hdrSize);
setNextBlockHeader(offset, b);
}
} }
if (nextBlockOnDiskSize > 0) {
// Copy next block's header bytes into the new block if we have them.
System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
b.buf.arrayOffset() + hdrSize
+ b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(),
hdrSize);
}
} else {
// The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next
// block's header in it.
b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0,
onDiskSizeWithHeader), getMinorVersion());
}
b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
// Set prefetched header
if (b.nextBlockOnDiskSizeWithHeader > 0) {
prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
System.arraycopy(onDiskBlock, onDiskSizeWithHeader,
prefetchedHeader.header, 0, hdrSize);
} }
b.includesMemstoreTS = includesMemstoreTS; b.includesMemstoreTS = includesMemstoreTS;
@ -1922,21 +1763,14 @@ public class HFileBlock extends SchemaConfigured implements Cacheable {
return b; return b;
} }
private void setNextBlockHeader(long offset, HFileBlock b) {
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
int nextHeaderOffset = b.buf.arrayOffset() + hdrSize
+ b.uncompressedSizeWithoutHeader + b.totalChecksumBytes();
System.arraycopy(b.buf.array(), nextHeaderOffset,
prefetchedHeader.header, 0, hdrSize);
}
void setIncludesMemstoreTS(boolean enabled) { void setIncludesMemstoreTS(boolean enabled) {
includesMemstoreTS = enabled; includesMemstoreTS = enabled;
} }
void setDataBlockEncoder(HFileDataBlockEncoder encoder) { void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
this.dataBlockEncoder = encoder; this.dataBlockEncoder = encoder;
encodedBlockDecodingCtx = encoder.newOnDiskDataBlockDecodingContext(
this.compressAlgo);
} }
/** /**

View File

@ -21,8 +21,10 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* Controls what kind of data block encoding is used. If data block encoding is * Controls what kind of data block encoding is used. If data block encoding is
@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.util.Pair;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface HFileDataBlockEncoder { public interface HFileDataBlockEncoder {
/** /**
* Converts a block from the on-disk format to the in-cache format. Called in * Converts a block from the on-disk format to the in-cache format. Called in
* the following cases: * the following cases:
@ -51,12 +54,14 @@ public interface HFileDataBlockEncoder {
* Should be called before an encoded or unencoded data block is written to * Should be called before an encoded or unencoded data block is written to
* disk. * disk.
* @param in KeyValues next to each other * @param in KeyValues next to each other
* @param dummyHeader A dummy header to be written as a placeholder * @param encodingResult the encoded result
* @return a non-null on-heap buffer containing the contents of the * @param blockType block type
* HFileBlock with unfilled header and block type * @throws IOException
*/ */
public Pair<ByteBuffer, BlockType> beforeWriteToDisk( public void beforeWriteToDisk(
ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader); ByteBuffer in, boolean includesMemstoreTS,
HFileBlockEncodingContext encodingResult,
BlockType blockType) throws IOException;
/** /**
* Decides whether we should use a scanner over encoded blocks. * Decides whether we should use a scanner over encoded blocks.
@ -85,4 +90,27 @@ public interface HFileDataBlockEncoder {
*/ */
public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction); public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
/**
* Create an encoder specific encoding context object for writing. And the
* encoding context should also perform compression if compressionAlgorithm is
* valid.
*
* @param compressionAlgorithm compression algorithm
* @param headerBytes header bytes
* @return a new {@link HFileBlockEncodingContext} object
*/
public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext(
Algorithm compressionAlgorithm, byte[] headerBytes);
/**
* create a encoder specific decoding context for reading. And the
* decoding context should also do decompression if compressionAlgorithm
* is valid.
*
* @param compressionAlgorithm
* @return a new {@link HFileBlockDecodingContext} object
*/
public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext(
Algorithm compressionAlgorithm);
} }

View File

@ -16,18 +16,20 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -39,6 +41,7 @@ import com.google.common.base.Preconditions;
public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
private final DataBlockEncoding onDisk; private final DataBlockEncoding onDisk;
private final DataBlockEncoding inCache; private final DataBlockEncoding inCache;
private final HFileBlockEncodingContext inCacheEncodeCtx;
public HFileDataBlockEncoderImpl(DataBlockEncoding encoding) { public HFileDataBlockEncoderImpl(DataBlockEncoding encoding) {
this(encoding, encoding); this(encoding, encoding);
@ -54,10 +57,36 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
*/ */
public HFileDataBlockEncoderImpl(DataBlockEncoding onDisk, public HFileDataBlockEncoderImpl(DataBlockEncoding onDisk,
DataBlockEncoding inCache) { DataBlockEncoding inCache) {
this(onDisk, inCache, null);
}
/**
* Do data block encoding with specified options.
* @param onDisk What kind of data block encoding will be used before writing
* HFileBlock to disk. This must be either the same as inCache or
* {@link DataBlockEncoding#NONE}.
* @param inCache What kind of data block encoding will be used in block
* cache.
* @param dummyHeader dummy header bytes
*/
public HFileDataBlockEncoderImpl(DataBlockEncoding onDisk,
DataBlockEncoding inCache, byte[] dummyHeader) {
dummyHeader = dummyHeader == null ? HFileBlock.DUMMY_HEADER : dummyHeader;
this.onDisk = onDisk != null ? this.onDisk = onDisk != null ?
onDisk : DataBlockEncoding.NONE; onDisk : DataBlockEncoding.NONE;
this.inCache = inCache != null ? this.inCache = inCache != null ?
inCache : DataBlockEncoding.NONE; inCache : DataBlockEncoding.NONE;
if (inCache != DataBlockEncoding.NONE) {
inCacheEncodeCtx =
this.inCache.getEncoder().newDataBlockEncodingContext(
Algorithm.NONE, this.inCache, dummyHeader);
} else {
// create a default encoding context
inCacheEncodeCtx =
new HFileBlockDefaultEncodingContext(Algorithm.NONE,
this.inCache, dummyHeader);
}
Preconditions.checkArgument(onDisk == DataBlockEncoding.NONE || Preconditions.checkArgument(onDisk == DataBlockEncoding.NONE ||
onDisk == inCache, "on-disk encoding (" + onDisk + ") must be " + onDisk == inCache, "on-disk encoding (" + onDisk + ") must be " +
"either the same as in-cache encoding (" + inCache + ") or " + "either the same as in-cache encoding (" + inCache + ") or " +
@ -131,7 +160,8 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
return block; return block;
} }
// Encode the unencoded block with the in-cache encoding. // Encode the unencoded block with the in-cache encoding.
return encodeDataBlock(block, inCache, block.doesIncludeMemstoreTS()); return encodeDataBlock(block, inCache, block.doesIncludeMemstoreTS(),
inCacheEncodeCtx);
} }
if (block.getBlockType() == BlockType.ENCODED_DATA) { if (block.getBlockType() == BlockType.ENCODED_DATA) {
@ -149,21 +179,25 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
} }
/** /**
* Precondition: a non-encoded buffer. * Precondition: a non-encoded buffer. Postcondition: on-disk encoding.
* Postcondition: on-disk encoding. *
* The encoded results can be stored in {@link HFileBlockEncodingContext}.
*
* @throws IOException
*/ */
@Override @Override
public Pair<ByteBuffer, BlockType> beforeWriteToDisk(ByteBuffer in, public void beforeWriteToDisk(ByteBuffer in,
boolean includesMemstoreTS, byte[] dummyHeader) { boolean includesMemstoreTS,
HFileBlockEncodingContext encodeCtx,
BlockType blockType) throws IOException {
if (onDisk == DataBlockEncoding.NONE) { if (onDisk == DataBlockEncoding.NONE) {
// there is no need to encode the block before writing it to disk // there is no need to encode the block before writing it to disk
return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA); ((HFileBlockDefaultEncodingContext) encodeCtx).compressAfterEncoding(
in.array(), blockType);
return;
} }
encodeBufferToHFileBlockBuffer(in, onDisk,
ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in, includesMemstoreTS, encodeCtx);
onDisk, includesMemstoreTS, dummyHeader);
return new Pair<ByteBuffer, BlockType>(encodedBuffer,
BlockType.ENCODED_DATA);
} }
@Override @Override
@ -174,34 +208,42 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
return inCache != DataBlockEncoding.NONE; return inCache != DataBlockEncoding.NONE;
} }
private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in, /**
* Encode a block of key value pairs.
*
* @param in input data to encode
* @param algo encoding algorithm
* @param includesMemstoreTS includes memstore timestamp or not
* @param encodeCtx where will the output data be stored
*/
private void encodeBufferToHFileBlockBuffer(ByteBuffer in,
DataBlockEncoding algo, boolean includesMemstoreTS, DataBlockEncoding algo, boolean includesMemstoreTS,
byte[] dummyHeader) { HFileBlockEncodingContext encodeCtx) {
ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(encodedStream);
DataBlockEncoder encoder = algo.getEncoder(); DataBlockEncoder encoder = algo.getEncoder();
try { try {
encodedStream.write(dummyHeader); encoder.compressKeyValues(in, includesMemstoreTS, encodeCtx);
algo.writeIdInBytes(dataOut);
encoder.compressKeyValues(dataOut, in,
includesMemstoreTS);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(String.format("Bug in data block encoder " + throw new RuntimeException(String.format(
"'%s', it probably requested too much data", algo.toString()), e); "Bug in data block encoder "
+ "'%s', it probably requested too much data, " +
"exception message: %s.",
algo.toString(), e.getMessage()), e);
} }
return ByteBuffer.wrap(encodedStream.toByteArray());
} }
private HFileBlock encodeDataBlock(HFileBlock block, private HFileBlock encodeDataBlock(HFileBlock block,
DataBlockEncoding algo, boolean includesMemstoreTS) { DataBlockEncoding algo, boolean includesMemstoreTS,
ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer( HFileBlockEncodingContext encodingCtx) {
block.getBufferWithoutHeader(), algo, includesMemstoreTS, encodeBufferToHFileBlockBuffer(
HFileBlock.DUMMY_HEADER); block.getBufferWithoutHeader(), algo, includesMemstoreTS, encodingCtx);
int sizeWithoutHeader = compressedBuffer.limit() - block.headerSize(); byte[] encodedUncompressedBytes =
encodingCtx.getUncompressedBytesWithHeader();
ByteBuffer bufferWrapper = ByteBuffer.wrap(encodedUncompressedBytes);
int sizeWithoutHeader = bufferWrapper.limit() - encodingCtx.getHeaderSize();
HFileBlock encodedBlock = new HFileBlock(BlockType.ENCODED_DATA, HFileBlock encodedBlock = new HFileBlock(BlockType.ENCODED_DATA,
block.getOnDiskSizeWithoutHeader(), block.getOnDiskSizeWithoutHeader(),
sizeWithoutHeader, block.getPrevBlockOffset(), sizeWithoutHeader, block.getPrevBlockOffset(),
compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(), bufferWrapper, HFileBlock.FILL_HEADER, block.getOffset(),
includesMemstoreTS, block.getMinorVersion(), includesMemstoreTS, block.getMinorVersion(),
block.getBytesPerChecksum(), block.getChecksumType(), block.getBytesPerChecksum(), block.getChecksumType(),
block.getOnDiskDataSizeWithHeader()); block.getOnDiskDataSizeWithHeader());
@ -215,4 +257,31 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
inCache + ")"; inCache + ")";
} }
@Override
public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext(
Algorithm compressionAlgorithm, byte[] dummyHeader) {
if (onDisk != null) {
DataBlockEncoder encoder = onDisk.getEncoder();
if (encoder != null) {
return encoder.newDataBlockEncodingContext(
compressionAlgorithm, onDisk, dummyHeader);
}
}
return new HFileBlockDefaultEncodingContext(compressionAlgorithm,
null, dummyHeader);
}
@Override
public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext(
Algorithm compressionAlgorithm) {
if (onDisk != null) {
DataBlockEncoder encoder = onDisk.getEncoder();
if (encoder != null) {
return encoder.newDataBlockDecodingContext(
compressionAlgorithm);
}
}
return new HFileBlockDefaultDecodingContext(compressionAlgorithm);
}
} }

View File

@ -432,7 +432,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
finishClose(trailer); finishClose(trailer);
fsBlockWriter.releaseCompressor(); fsBlockWriter.release();
} }
@Override @Override

View File

@ -16,12 +16,17 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* Does not perform any kind of encoding/decoding. * Does not perform any kind of encoding/decoding.
@ -45,9 +50,19 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
} }
@Override @Override
public Pair<ByteBuffer, BlockType> beforeWriteToDisk( public void beforeWriteToDisk(ByteBuffer in,
ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader) { boolean includesMemstoreTS,
return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA); HFileBlockEncodingContext encodeCtx, BlockType blockType)
throws IOException {
if (!(encodeCtx.getClass().getName().equals(
HFileBlockDefaultEncodingContext.class.getName()))) {
throw new IOException (this.getClass().getName() + " only accepts " +
HFileBlockDefaultEncodingContext.class.getName() + ".");
}
HFileBlockDefaultEncodingContext defaultContext =
(HFileBlockDefaultEncodingContext) encodeCtx;
defaultContext.compressAfterEncoding(in.array(), blockType);
} }
@Override @Override
@ -79,4 +94,17 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
return getClass().getSimpleName(); return getClass().getSimpleName();
} }
@Override
public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext(
Algorithm compressionAlgorithm, byte[] dummyHeader) {
return new HFileBlockDefaultEncodingContext(compressionAlgorithm,
null, dummyHeader);
}
@Override
public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext(
Algorithm compressionAlgorithm) {
return new HFileBlockDefaultDecodingContext(compressionAlgorithm);
}
} }

View File

@ -20,9 +20,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -34,6 +32,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -51,6 +51,9 @@ public class TestDataBlockEncoders {
static int NUMBER_OF_KV = 10000; static int NUMBER_OF_KV = 10000;
static int NUM_RANDOM_SEEKS = 10000; static int NUM_RANDOM_SEEKS = 10000;
private static int ENCODED_DATA_OFFSET =
HFileBlock.HEADER_SIZE + DataBlockEncoding.ID_SIZE;
private RedundantKVGenerator generator = new RedundantKVGenerator(); private RedundantKVGenerator generator = new RedundantKVGenerator();
private Random randomizer = new Random(42l); private Random randomizer = new Random(42l);
@ -65,17 +68,44 @@ public class TestDataBlockEncoders {
this.includesMemstoreTS = includesMemstoreTS; this.includesMemstoreTS = includesMemstoreTS;
} }
private void testAlgorithm(ByteBuffer dataset, DataBlockEncoder encoder) private HFileBlockEncodingContext getEncodingContext(
Compression.Algorithm algo, DataBlockEncoding encoding) {
DataBlockEncoder encoder = encoding.getEncoder();
if (encoder != null) {
return encoder.newDataBlockEncodingContext(algo, encoding,
HFileBlock.DUMMY_HEADER);
} else {
return new HFileBlockDefaultEncodingContext(algo, encoding);
}
}
private byte[] encodeBytes(DataBlockEncoding encoding,
ByteBuffer dataset) throws IOException {
DataBlockEncoder encoder = encoding.getEncoder();
HFileBlockEncodingContext encodingCtx =
getEncodingContext(Compression.Algorithm.NONE, encoding);
encoder.compressKeyValues(dataset, includesMemstoreTS,
encodingCtx);
byte[] encodedBytesWithHeader =
encodingCtx.getUncompressedBytesWithHeader();
byte[] encodedData =
new byte[encodedBytesWithHeader.length - ENCODED_DATA_OFFSET];
System.arraycopy(encodedBytesWithHeader, ENCODED_DATA_OFFSET, encodedData,
0, encodedData.length);
return encodedData;
}
private void testAlgorithm(ByteBuffer dataset, DataBlockEncoding encoding)
throws IOException { throws IOException {
// encode // encode
ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] encodedBytes = encodeBytes(encoding, dataset);
DataOutputStream dataOut = new DataOutputStream(baos); //decode
encoder.compressKeyValues(dataOut, dataset, includesMemstoreTS); ByteArrayInputStream bais = new ByteArrayInputStream(encodedBytes);
// decode
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputStream dis = new DataInputStream(bais); DataInputStream dis = new DataInputStream(bais);
ByteBuffer actualDataset; ByteBuffer actualDataset;
DataBlockEncoder encoder = encoding.getEncoder();
actualDataset = encoder.uncompressKeyValues(dis, includesMemstoreTS); actualDataset = encoder.uncompressKeyValues(dis, includesMemstoreTS);
dataset.rewind(); dataset.rewind();
@ -142,17 +172,17 @@ public class TestDataBlockEncoders {
ByteBuffer originalBuffer = ByteBuffer originalBuffer =
RedundantKVGenerator.convertKvToByteBuffer(sampleKv, RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
includesMemstoreTS); includesMemstoreTS);
List<DataBlockEncoder> dataBlockEncoders =
DataBlockEncoding.getAllEncoders();
// create all seekers // create all seekers
List<DataBlockEncoder.EncodedSeeker> encodedSeekers = List<DataBlockEncoder.EncodedSeeker> encodedSeekers =
new ArrayList<DataBlockEncoder.EncodedSeeker>(); new ArrayList<DataBlockEncoder.EncodedSeeker>();
for (DataBlockEncoder encoder : dataBlockEncoders) { for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); if (encoding.getEncoder() == null) {
DataOutputStream dataOut = new DataOutputStream(baos); continue;
encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS); }
ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray()); ByteBuffer encodedBuffer =
ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
DataBlockEncoder encoder = encoding.getEncoder();
DataBlockEncoder.EncodedSeeker seeker = DataBlockEncoder.EncodedSeeker seeker =
encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS); encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS);
seeker.setCurrentBuffer(encodedBuffer); seeker.setCurrentBuffer(encodedBuffer);
@ -195,20 +225,19 @@ public class TestDataBlockEncoders {
ByteBuffer originalBuffer = ByteBuffer originalBuffer =
RedundantKVGenerator.convertKvToByteBuffer(sampleKv, RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
includesMemstoreTS); includesMemstoreTS);
List<DataBlockEncoder> dataBlockEncoders =
DataBlockEncoding.getAllEncoders();
for (DataBlockEncoder encoder : dataBlockEncoders) { for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); if (encoding.getEncoder() == null) {
DataOutputStream dataOut = new DataOutputStream(baos); continue;
}
DataBlockEncoder encoder = encoding.getEncoder();
ByteBuffer encodedBuffer = null;
try { try {
encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS); encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(String.format( throw new RuntimeException(String.format(
"Bug while encoding using '%s'", encoder.toString()), e); "Bug while encoding using '%s'", encoder.toString()), e);
} }
ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
DataBlockEncoder.EncodedSeeker seeker = DataBlockEncoder.EncodedSeeker seeker =
encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS); encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS);
seeker.setCurrentBuffer(encodedBuffer); seeker.setCurrentBuffer(encodedBuffer);
@ -255,20 +284,19 @@ public class TestDataBlockEncoders {
ByteBuffer originalBuffer = ByteBuffer originalBuffer =
RedundantKVGenerator.convertKvToByteBuffer(sampleKv, RedundantKVGenerator.convertKvToByteBuffer(sampleKv,
includesMemstoreTS); includesMemstoreTS);
List<DataBlockEncoder> dataBlockEncoders =
DataBlockEncoding.getAllEncoders();
for (DataBlockEncoder encoder : dataBlockEncoders) { for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); if (encoding.getEncoder() == null) {
DataOutputStream dataOut = new DataOutputStream(baos); continue;
}
DataBlockEncoder encoder = encoding.getEncoder();
ByteBuffer encodedBuffer = null;
try { try {
encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS); encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(String.format( throw new RuntimeException(String.format(
"Bug while encoding using '%s'", encoder.toString()), e); "Bug while encoding using '%s'", encoder.toString()), e);
} }
ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray());
ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer); ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer);
KeyValue firstKv = sampleKv.get(0); KeyValue firstKv = sampleKv.get(0);
if (0 != Bytes.compareTo( if (0 != Bytes.compareTo(
@ -327,16 +355,17 @@ public class TestDataBlockEncoders {
private void testEncodersOnDataset(ByteBuffer onDataset) private void testEncodersOnDataset(ByteBuffer onDataset)
throws IOException{ throws IOException{
List<DataBlockEncoder> dataBlockEncoders =
DataBlockEncoding.getAllEncoders();
ByteBuffer dataset = ByteBuffer.allocate(onDataset.capacity()); ByteBuffer dataset = ByteBuffer.allocate(onDataset.capacity());
onDataset.rewind(); onDataset.rewind();
dataset.put(onDataset); dataset.put(onDataset);
onDataset.rewind(); onDataset.rewind();
dataset.flip(); dataset.flip();
for (DataBlockEncoder encoder : dataBlockEncoders) { for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
testAlgorithm(dataset, encoder); if (encoding.getEncoder() == null) {
continue;
}
testAlgorithm(dataset, encoding);
// ensure that dataset is unchanged // ensure that dataset is unchanged
dataset.rewind(); dataset.rewind();

View File

@ -50,12 +50,15 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.DoubleOutputStream; import org.apache.hadoop.hbase.io.DoubleOutputStream;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Compressor;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*; import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
@ -203,7 +206,7 @@ public class TestHFileBlock {
writeTestBlockContents(dos); writeTestBlockContents(dos);
byte[] headerAndData = hbw.getHeaderAndDataForTest(); byte[] headerAndData = hbw.getHeaderAndDataForTest();
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
hbw.releaseCompressor(); hbw.release();
return hbw; return hbw;
} }
@ -371,9 +374,8 @@ public class TestHFileBlock {
final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>(); final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
for (int blockId = 0; blockId < numBlocks; ++blockId) { for (int blockId = 0; blockId < numBlocks; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA); DataOutputStream dos = hbw.startWriting(BlockType.DATA);
writeEncodedBlock(encoding, dos, encodedSizes, encodedBlocks, writeEncodedBlock(algo, encoding, dos, encodedSizes, encodedBlocks,
blockId, includesMemstoreTS); blockId, includesMemstoreTS, HFileBlock.DUMMY_HEADER);
hbw.writeHeaderAndData(os); hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader(); totalSize += hbw.getOnDiskSizeWithHeader();
} }
@ -392,7 +394,6 @@ public class TestHFileBlock {
assertEquals(0, HFile.getChecksumFailuresCount()); assertEquals(0, HFile.getChecksumFailuresCount());
b.sanityCheck(); b.sanityCheck();
pos += b.getOnDiskSizeWithHeader(); pos += b.getOnDiskSizeWithHeader();
assertEquals((int) encodedSizes.get(blockId), assertEquals((int) encodedSizes.get(blockId),
b.getUncompressedSizeWithoutHeader()); b.getUncompressedSizeWithoutHeader());
ByteBuffer actualBuffer = b.getBufferWithoutHeader(); ByteBuffer actualBuffer = b.getBufferWithoutHeader();
@ -417,35 +418,52 @@ public class TestHFileBlock {
} }
} }
static void writeEncodedBlock(DataBlockEncoding encoding, static void writeEncodedBlock(Algorithm algo, DataBlockEncoding encoding,
DataOutputStream dos, final List<Integer> encodedSizes, DataOutputStream dos, final List<Integer> encodedSizes,
final List<ByteBuffer> encodedBlocks, int blockId, final List<ByteBuffer> encodedBlocks, int blockId,
boolean includesMemstoreTS) throws IOException { boolean includesMemstoreTS, byte[] dummyHeader) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DoubleOutputStream doubleOutputStream = DoubleOutputStream doubleOutputStream =
new DoubleOutputStream(dos, baos); new DoubleOutputStream(dos, baos);
writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS);
final int rawBlockSize = writeTestKeyValues(doubleOutputStream,
blockId, includesMemstoreTS);
ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray()); ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray());
rawBuf.rewind(); rawBuf.rewind();
final int encodedSize; DataBlockEncoder encoder = encoding.getEncoder();
final ByteBuffer encodedBuf; int headerLen = dummyHeader.length;
if (encoding == DataBlockEncoding.NONE) { byte[] encodedResultWithHeader = null;
encodedSize = rawBlockSize; if (encoder != null) {
encodedBuf = rawBuf; HFileBlockEncodingContext encodingCtx =
encoder.newDataBlockEncodingContext(algo, encoding, dummyHeader);
encoder.compressKeyValues(rawBuf, includesMemstoreTS,
encodingCtx);
encodedResultWithHeader =
encodingCtx.getUncompressedBytesWithHeader();
} else { } else {
ByteArrayOutputStream encodedOut = new ByteArrayOutputStream(); HFileBlockDefaultEncodingContext defaultEncodingCtx =
encoding.getEncoder().compressKeyValues( new HFileBlockDefaultEncodingContext(algo, encoding, dummyHeader);
new DataOutputStream(encodedOut), byte[] rawBufWithHeader =
rawBuf.duplicate(), includesMemstoreTS); new byte[rawBuf.array().length + headerLen];
System.arraycopy(rawBuf.array(), 0, rawBufWithHeader,
headerLen, rawBuf.array().length);
defaultEncodingCtx.compressAfterEncoding(rawBufWithHeader,
BlockType.DATA);
encodedResultWithHeader =
defaultEncodingCtx.getUncompressedBytesWithHeader();
}
final int encodedSize =
encodedResultWithHeader.length - headerLen;
if (encoder != null) {
// We need to account for the two-byte encoding algorithm ID that // We need to account for the two-byte encoding algorithm ID that
// comes after the 24-byte block header but before encoded KVs. // comes after the 24-byte block header but before encoded KVs.
encodedSize = encodedOut.size() + DataBlockEncoding.ID_SIZE; headerLen += DataBlockEncoding.ID_SIZE;
encodedBuf = ByteBuffer.wrap(encodedOut.toByteArray());
} }
byte[] encodedDataSection =
new byte[encodedResultWithHeader.length - headerLen];
System.arraycopy(encodedResultWithHeader, headerLen,
encodedDataSection, 0, encodedDataSection.length);
final ByteBuffer encodedBuf =
ByteBuffer.wrap(encodedDataSection);
encodedSizes.add(encodedSize); encodedSizes.add(encodedSize);
encodedBlocks.add(encodedBuf); encodedBlocks.add(encodedBuf);
} }

View File

@ -19,7 +19,11 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.*; import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.GZ;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -28,40 +32,23 @@ import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.DoubleOutputStream;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.Compressor;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -69,6 +56,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Preconditions;
/** /**
* This class has unit tests to prove that older versions of * This class has unit tests to prove that older versions of
* HFiles (without checksums) are compatible with current readers. * HFiles (without checksums) are compatible with current readers.
@ -129,7 +118,8 @@ public class TestHFileBlockCompatibility {
includesMemstoreTS); includesMemstoreTS);
DataOutputStream dos = hbw.startWriting(blockType); DataOutputStream dos = hbw.startWriting(blockType);
TestHFileBlock.writeTestBlockContents(dos); TestHFileBlock.writeTestBlockContents(dos);
byte[] headerAndData = hbw.getHeaderAndData(); // make sure the block is ready by calling hbw.getHeaderAndData()
hbw.getHeaderAndData();
assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
hbw.releaseCompressor(); hbw.releaseCompressor();
return hbw; return hbw;
@ -145,7 +135,7 @@ public class TestHFileBlockCompatibility {
// variations across operating systems. // variations across operating systems.
// See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
testV2Block[osOffset] = 3; testV2Block[osOffset] = 3;
} }
return Bytes.toStringBinary(testV2Block); return Bytes.toStringBinary(testV2Block);
} }
@ -173,8 +163,9 @@ public class TestHFileBlockCompatibility {
+ "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
+ "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00"; + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00";
final int correctGzipBlockLength = 82; final int correctGzipBlockLength = 82;
assertEquals(correctTestBlockStr, createTestBlockStr(GZ,
correctGzipBlockLength)); String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
assertEquals(correctTestBlockStr, returnedStr);
} }
@Test @Test
@ -288,16 +279,19 @@ public class TestHFileBlockCompatibility {
+ algo + "_" + encoding.toString()); + algo + "_" + encoding.toString());
FSDataOutputStream os = fs.create(path); FSDataOutputStream os = fs.create(path);
HFileDataBlockEncoder dataBlockEncoder = HFileDataBlockEncoder dataBlockEncoder =
new HFileDataBlockEncoderImpl(encoding); new HFileDataBlockEncoderImpl(encoding, encoding,
Writer hbw = new Writer(algo, dataBlockEncoder, TestHFileBlockCompatibility.Writer.DUMMY_HEADER);
includesMemstoreTS); TestHFileBlockCompatibility.Writer hbw =
new TestHFileBlockCompatibility.Writer(algo,
dataBlockEncoder, includesMemstoreTS);
long totalSize = 0; long totalSize = 0;
final List<Integer> encodedSizes = new ArrayList<Integer>(); final List<Integer> encodedSizes = new ArrayList<Integer>();
final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>(); final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
for (int blockId = 0; blockId < numBlocks; ++blockId) { for (int blockId = 0; blockId < numBlocks; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA); DataOutputStream dos = hbw.startWriting(BlockType.DATA);
TestHFileBlock.writeEncodedBlock(encoding, dos, encodedSizes, encodedBlocks, TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes,
blockId, includesMemstoreTS); encodedBlocks, blockId, includesMemstoreTS,
TestHFileBlockCompatibility.Writer.DUMMY_HEADER);
hbw.writeHeaderAndData(os); hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader(); totalSize += hbw.getOnDiskSizeWithHeader();
@ -332,8 +326,8 @@ public class TestHFileBlockCompatibility {
expectedBuffer.rewind(); expectedBuffer.rewind();
// test if content matches, produce nice message // test if content matches, produce nice message
TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
pread); algo, encoding, pread);
} }
is.close(); is.close();
} }
@ -378,6 +372,10 @@ public class TestHFileBlockCompatibility {
/** Data block encoder used for data blocks */ /** Data block encoder used for data blocks */
private final HFileDataBlockEncoder dataBlockEncoder; private final HFileDataBlockEncoder dataBlockEncoder;
private HFileBlockEncodingContext dataBlockEncodingCtx;
/** block encoding context for non-data blocks */
private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
/** /**
* The stream we use to accumulate data in uncompressed format for each * The stream we use to accumulate data in uncompressed format for each
* block. We reset this stream at the end of each block and reuse it. The * block. We reset this stream at the end of each block and reuse it. The
@ -389,12 +387,6 @@ public class TestHFileBlockCompatibility {
/** Compressor, which is also reused between consecutive blocks. */ /** Compressor, which is also reused between consecutive blocks. */
private Compressor compressor; private Compressor compressor;
/** Compression output stream */
private CompressionOutputStream compressionStream;
/** Underlying stream to write compressed bytes to */
private ByteArrayOutputStream compressedByteStream;
/** /**
* Current block type. Set in {@link #startWriting(BlockType)}. Could be * Current block type. Set in {@link #startWriting(BlockType)}. Could be
* changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
@ -449,19 +441,14 @@ public class TestHFileBlockCompatibility {
this.dataBlockEncoder = dataBlockEncoder != null this.dataBlockEncoder = dataBlockEncoder != null
? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
defaultBlockEncodingCtx =
new HFileBlockDefaultEncodingContext(compressionAlgorithm,
null, DUMMY_HEADER);
dataBlockEncodingCtx =
this.dataBlockEncoder.newOnDiskDataBlockEncodingContext(
compressionAlgorithm, DUMMY_HEADER);
baosInMemory = new ByteArrayOutputStream(); baosInMemory = new ByteArrayOutputStream();
if (compressAlgo != NONE) {
compressor = compressionAlgorithm.getCompressor();
compressedByteStream = new ByteArrayOutputStream();
try {
compressionStream =
compressionAlgorithm.createPlainCompressionStream(
compressedByteStream, compressor);
} catch (IOException e) {
throw new RuntimeException("Could not create compression stream " +
"for algorithm " + compressionAlgorithm, e);
}
}
prevOffsetByType = new long[BlockType.values().length]; prevOffsetByType = new long[BlockType.values().length];
for (int i = 0; i < prevOffsetByType.length; ++i) for (int i = 0; i < prevOffsetByType.length; ++i)
@ -532,48 +519,31 @@ public class TestHFileBlockCompatibility {
*/ */
private void finishBlock() throws IOException { private void finishBlock() throws IOException {
userDataStream.flush(); userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array. // This does an array copy, so it is safe to cache this byte array.
uncompressedBytesWithHeader = baosInMemory.toByteArray(); uncompressedBytesWithHeader = baosInMemory.toByteArray();
LOG.warn("Writer.finishBlock user data size with header before compression " +
uncompressedBytesWithHeader.length);
prevOffset = prevOffsetByType[blockType.getId()]; prevOffset = prevOffsetByType[blockType.getId()];
// We need to set state before we can package the block up for // We need to set state before we can package the block up for
// cache-on-write. In a way, the block is ready, but not yet encoded or // cache-on-write. In a way, the block is ready, but not yet encoded or
// compressed. // compressed.
state = State.BLOCK_READY; state = State.BLOCK_READY;
encodeDataBlockForDisk(); if (blockType == BlockType.DATA) {
encodeDataBlockForDisk();
doCompression();
putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length);
}
/**
* Do compression if it is enabled, or re-use the uncompressed buffer if
* it is not. Fills in the compressed block's header if doing compression.
*/
private void doCompression() throws IOException {
// do the compression
if (compressAlgo != NONE) {
compressedByteStream.reset();
compressedByteStream.write(DUMMY_HEADER);
compressionStream.resetState();
compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
uncompressedBytesWithHeader.length - HEADER_SIZE);
compressionStream.flush();
compressionStream.finish();
onDiskBytesWithHeader = compressedByteStream.toByteArray();
putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length);
} else { } else {
onDiskBytesWithHeader = uncompressedBytesWithHeader; defaultBlockEncodingCtx.compressAfterEncoding(
uncompressedBytesWithHeader, blockType);
onDiskBytesWithHeader =
defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
} }
// put the header for on disk bytes
putHeader(onDiskBytesWithHeader, 0,
onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length);
//set the header for the uncompressed bytes (for cache-on-write)
putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length);
} }
/** /**
@ -581,35 +551,20 @@ public class TestHFileBlockCompatibility {
* {@link #dataBlockEncoder}. * {@link #dataBlockEncoder}.
*/ */
private void encodeDataBlockForDisk() throws IOException { private void encodeDataBlockForDisk() throws IOException {
if (blockType != BlockType.DATA) {
return; // skip any non-data block
}
// do data block encoding, if data block encoder is set // do data block encoding, if data block encoder is set
ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader, ByteBuffer rawKeyValues =
HEADER_SIZE, uncompressedBytesWithHeader.length - ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE,
HEADER_SIZE).slice(); uncompressedBytesWithHeader.length - HEADER_SIZE).slice();
Pair<ByteBuffer, BlockType> encodingResult =
dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
includesMemstoreTS, DUMMY_HEADER);
BlockType encodedBlockType = encodingResult.getSecond(); //do the encoding
if (encodedBlockType == BlockType.ENCODED_DATA) { dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
uncompressedBytesWithHeader = encodingResult.getFirst().array(); includesMemstoreTS, dataBlockEncodingCtx, blockType);
blockType = BlockType.ENCODED_DATA;
} else { uncompressedBytesWithHeader =
// There is no encoding configured. Do some extra sanity-checking. dataBlockEncodingCtx.getUncompressedBytesWithHeader();
if (encodedBlockType != BlockType.DATA) { onDiskBytesWithHeader =
throw new IOException("Unexpected block type coming out of data " + dataBlockEncodingCtx.getOnDiskBytesWithHeader();
"block encoder: " + encodedBlockType); blockType = dataBlockEncodingCtx.getBlockType();
}
if (userDataStream.size() !=
uncompressedBytesWithHeader.length - HEADER_SIZE) {
throw new IOException("Uncompressed size mismatch: "
+ userDataStream.size() + " vs. "
+ (uncompressedBytesWithHeader.length - HEADER_SIZE));
}
}
} }
/** /**
@ -802,5 +757,6 @@ public class TestHFileBlockCompatibility {
getOnDiskSizeWithoutHeader()); getOnDiskSizeWithoutHeader());
} }
} }
} }

View File

@ -30,11 +30,12 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.io.encoding.RedundantKVGenerator; import org.apache.hadoop.hbase.io.encoding.RedundantKVGenerator;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -117,18 +118,23 @@ public class TestHFileDataBlockEncoder {
/** /**
* Test writing to disk. * Test writing to disk.
* @throws IOException
*/ */
@Test @Test
public void testEncodingWritePath() { public void testEncodingWritePath() throws IOException {
// usually we have just block without headers, but don't complicate that // usually we have just block without headers, but don't complicate that
HFileBlock block = getSampleHFileBlock(); HFileBlock block = getSampleHFileBlock();
Pair<ByteBuffer, BlockType> result = HFileBlockEncodingContext context =
blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(), new HFileBlockDefaultEncodingContext(
includesMemstoreTS, HFileBlock.DUMMY_HEADER); Compression.Algorithm.NONE, blockEncoder.getEncodingOnDisk());
blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(),
includesMemstoreTS, context, block.getBlockType());
int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE; byte[] encodedBytes = context.getUncompressedBytesWithHeader();
HFileBlock blockOnDisk = new HFileBlock(result.getSecond(), int size = encodedBytes.length - HFileBlock.HEADER_SIZE;
size, size, -1, result.getFirst(), HFileBlock.FILL_HEADER, 0, HFileBlock blockOnDisk =
new HFileBlock(context.getBlockType(), size, size, -1,
ByteBuffer.wrap(encodedBytes), HFileBlock.FILL_HEADER, 0,
includesMemstoreTS, block.getMinorVersion(), includesMemstoreTS, block.getMinorVersion(),
block.getBytesPerChecksum(), block.getChecksumType(), block.getBytesPerChecksum(), block.getChecksumType(),
block.getOnDiskDataSizeWithHeader()); block.getOnDiskDataSizeWithHeader());

View File

@ -115,11 +115,10 @@ public class DataBlockEncodingTool {
byte[] previousKey = null; byte[] previousKey = null;
byte[] currentKey; byte[] currentKey;
List<DataBlockEncoder> dataBlockEncoders = DataBlockEncoding[] encodings = DataBlockEncoding.values();
DataBlockEncoding.getAllEncoders(); for(DataBlockEncoding encoding : encodings) {
DataBlockEncoder d = encoding.getEncoder();
for (DataBlockEncoder d : dataBlockEncoders) { codecs.add(new EncodedDataBlock(d, includesMemstoreTS, encoding));
codecs.add(new EncodedDataBlock(d, includesMemstoreTS));
} }
int j = 0; int j = 0;
@ -280,7 +279,7 @@ public class DataBlockEncodingTool {
List<Long> compressDurations = new ArrayList<Long>(); List<Long> compressDurations = new ArrayList<Long>();
for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) { for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) {
final long startTime = System.nanoTime(); final long startTime = System.nanoTime();
codec.doCompressData(); codec.encodeData();
final long finishTime = System.nanoTime(); final long finishTime = System.nanoTime();
if (itTime >= BENCHMARK_N_OMIT) { if (itTime >= BENCHMARK_N_OMIT) {
compressDurations.add(finishTime - startTime); compressDurations.add(finishTime - startTime);