HBASE-17623 Reuse the bytes array when building the hfile block

This commit is contained in:
Chia-Ping Tsai 2017-03-22 03:50:48 +08:00 committed by CHIA-PING TSAI
parent faf81d5133
commit 6bd3109062
4 changed files with 87 additions and 84 deletions

View File

@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.io.encoding;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
@ -48,7 +49,6 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class HFileBlockDefaultEncodingContext implements
HFileBlockEncodingContext {
private byte[] onDiskBytesWithHeader;
private BlockType blockType;
private final DataBlockEncoding encodingAlgo;
@ -128,17 +128,12 @@ public class HFileBlockDefaultEncodingContext implements
}
@Override
public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException {
compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader);
return onDiskBytesWithHeader;
public Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException {
return compressAfterEncoding(data, offset, length, dummyHeader);
}
/**
* @param uncompressedBytesWithHeader
* @param headerBytes
* @throws IOException
*/
protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes)
private Bytes compressAfterEncoding(byte[] uncompressedBytesWithHeaderBuffer,
int uncompressedBytesWithHeaderOffset, int uncompressedBytesWithHeaderLength, byte[] headerBytes)
throws IOException {
Encryption.Context cryptoContext = fileContext.getEncryptionContext();
if (cryptoContext != Encryption.Context.NONE) {
@ -162,17 +157,17 @@ public class HFileBlockDefaultEncodingContext implements
if (fileContext.getCompression() != Compression.Algorithm.NONE) {
compressedByteStream.reset();
compressionStream.resetState();
compressionStream.write(uncompressedBytesWithHeader,
headerBytes.length, uncompressedBytesWithHeader.length - headerBytes.length);
compressionStream.write(uncompressedBytesWithHeaderBuffer,
headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength - headerBytes.length);
compressionStream.flush();
compressionStream.finish();
byte[] plaintext = compressedByteStream.toByteArray();
plaintextLength = plaintext.length;
in = new ByteArrayInputStream(plaintext);
} else {
plaintextLength = uncompressedBytesWithHeader.length - headerBytes.length;
in = new ByteArrayInputStream(uncompressedBytesWithHeader,
headerBytes.length, plaintextLength);
plaintextLength = uncompressedBytesWithHeaderLength - headerBytes.length;
in = new ByteArrayInputStream(uncompressedBytesWithHeaderBuffer,
headerBytes.length + uncompressedBytesWithHeaderOffset, plaintextLength);
}
if (plaintextLength > 0) {
@ -194,16 +189,13 @@ public class HFileBlockDefaultEncodingContext implements
// Encrypt the data
Encryption.encrypt(cryptoByteStream, in, encryptor);
onDiskBytesWithHeader = cryptoByteStream.toByteArray();
// Increment the IV given the final block size
Encryption.incrementIv(iv, 1 + (onDiskBytesWithHeader.length / encryptor.getBlockSize()));
Encryption.incrementIv(iv, 1 + (cryptoByteStream.size() / encryptor.getBlockSize()));
return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size());
} else {
cryptoByteStream.write(0);
onDiskBytesWithHeader = cryptoByteStream.toByteArray();
return new Bytes(cryptoByteStream.getBuffer(), 0, cryptoByteStream.size());
}
} else {
@ -212,14 +204,14 @@ public class HFileBlockDefaultEncodingContext implements
compressedByteStream.reset();
compressedByteStream.write(headerBytes);
compressionStream.resetState();
compressionStream.write(uncompressedBytesWithHeader,
headerBytes.length, uncompressedBytesWithHeader.length
compressionStream.write(uncompressedBytesWithHeaderBuffer,
headerBytes.length + uncompressedBytesWithHeaderOffset, uncompressedBytesWithHeaderLength
- headerBytes.length);
compressionStream.flush();
compressionStream.finish();
onDiskBytesWithHeader = compressedByteStream.toByteArray();
return new Bytes(compressedByteStream.getBuffer(), 0, compressedByteStream.size());
} else {
onDiskBytesWithHeader = uncompressedBytesWithHeader;
return null;
}
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.util.Bytes;
/**
* An encoding context that is created by a writer's encoder, and is shared
@ -73,9 +74,14 @@ public interface HFileBlockEncodingContext {
EncodingState getEncodingState();
/**
* @param uncompressedBytesWithHeader encoded bytes with header
* @return Bytes with header which are ready to write out to disk. This is compressed and
* encrypted bytes applying the set compression algorithm and encryption.
* @param data encoded bytes with header
* @param offset the offset in encoded data to start at
* @param length the number of encoded bytes
* @return Bytes with header which are ready to write out to disk.
* This is compressed and encrypted bytes applying the set compression
* algorithm and encryption. The bytes may be changed.
* If need a Bytes reference for later use, clone the bytes and use that.
* Null if the data doesn't need to be compressed and encrypted.
*/
byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException;
Bytes compressAndEncrypt(byte[] data, int offset, int length) throws IOException;
}

View File

@ -326,7 +326,7 @@ public class HFileBlock implements Cacheable {
/**
* Creates a new {@link HFile} block from the given fields. This constructor
* is used when the block data has already been read and uncompressed,
* is used only while writing blocks and caching,
* and is sitting in a byte buffer and we want to stuff the block into cache.
* See {@link Writer#getBlockForCaching(CacheConfig)}.
*
@ -338,8 +338,7 @@ public class HFileBlock implements Cacheable {
* @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
* @param prevBlockOffset see {@link #prevBlockOffset}
* @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
* uncompressed data.
* @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
* @param fillHeader when true, write the first 4 header fields into passed buffer.
* @param offset the file offset the block was read from
* @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
@ -877,7 +876,7 @@ public class HFileBlock implements Cacheable {
* if compression is turned on. It also includes the checksum data that
* immediately follows the block data. (header + data + checksums)
*/
private byte[] onDiskBlockBytesWithHeader;
private ByteArrayOutputStream onDiskBlockBytesWithHeader;
/**
* The size of the checksum data on disk. It is used only if data is
@ -887,15 +886,6 @@ public class HFileBlock implements Cacheable {
*/
private byte[] onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
/**
* Valid in the READY state. Contains the header and the uncompressed (but
* potentially encoded, if this is a data block) bytes, so the length is
* {@link #uncompressedSizeWithoutHeader} +
* {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
* Does not store checksums.
*/
private byte[] uncompressedBlockBytesWithHeader;
/**
* Current block's start offset in the {@link HFile}. Set in
* {@link #writeHeaderAndData(FSDataOutputStream)}.
@ -1023,42 +1013,42 @@ public class HFileBlock implements Cacheable {
blockType = dataBlockEncodingCtx.getBlockType();
}
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array when cache-on-write.
// Header is still the empty, 'dummy' header that is yet to be filled out.
uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
prevOffset = prevOffsetByType[blockType.getId()];
// We need to set state before we can package the block up for cache-on-write. In a way, the
// block is ready, but not yet encoded or compressed.
state = State.BLOCK_READY;
Bytes compressAndEncryptDat;
if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
onDiskBlockBytesWithHeader = dataBlockEncodingCtx.
compressAndEncrypt(uncompressedBlockBytesWithHeader);
compressAndEncryptDat = dataBlockEncodingCtx.
compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
} else {
onDiskBlockBytesWithHeader = defaultBlockEncodingCtx.
compressAndEncrypt(uncompressedBlockBytesWithHeader);
compressAndEncryptDat = defaultBlockEncodingCtx.
compressAndEncrypt(baosInMemory.getBuffer(), 0, baosInMemory.size());
}
if (compressAndEncryptDat == null) {
compressAndEncryptDat = new Bytes(baosInMemory.getBuffer(), 0, baosInMemory.size());
}
if (onDiskBlockBytesWithHeader == null) {
onDiskBlockBytesWithHeader = new ByteArrayOutputStream(compressAndEncryptDat.getLength());
}
onDiskBlockBytesWithHeader.reset();
onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
// Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBlockBytesWithHeader.length,
onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());
// Put the header for the on disk bytes; header currently is unfilled-out
putHeader(onDiskBlockBytesWithHeader, 0,
onDiskBlockBytesWithHeader.length + numBytes,
uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length);
// Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from
// onDiskBlockBytesWithHeader array.
if (onDiskBlockBytesWithHeader != uncompressedBlockBytesWithHeader) {
putHeader(uncompressedBlockBytesWithHeader, 0,
onDiskBlockBytesWithHeader.length + numBytes,
uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length);
}
putHeader(onDiskBlockBytesWithHeader,
onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
if (onDiskChecksum.length != numBytes) {
onDiskChecksum = new byte[numBytes];
}
ChecksumUtil.generateChecksums(
onDiskBlockBytesWithHeader, 0, onDiskBlockBytesWithHeader.length,
onDiskBlockBytesWithHeader.getBuffer(), 0,onDiskBlockBytesWithHeader.size(),
onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
}
@ -1081,6 +1071,11 @@ public class HFileBlock implements Cacheable {
Bytes.putInt(dest, offset, onDiskDataSize);
}
private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
int uncompressedSize, int onDiskDataSize) {
putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
}
/**
* Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
* the offset of this block so that it can be referenced in the next block
@ -1113,7 +1108,7 @@ public class HFileBlock implements Cacheable {
protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
throws IOException {
ensureBlockReady();
out.write(onDiskBlockBytesWithHeader);
out.write(onDiskBlockBytesWithHeader.getBuffer(), 0, onDiskBlockBytesWithHeader.size());
out.write(onDiskChecksum);
}
@ -1132,12 +1127,12 @@ public class HFileBlock implements Cacheable {
// This is not very optimal, because we are doing an extra copy.
// But this method is used only by unit tests.
byte[] output =
new byte[onDiskBlockBytesWithHeader.length
new byte[onDiskBlockBytesWithHeader.size()
+ onDiskChecksum.length];
System.arraycopy(onDiskBlockBytesWithHeader, 0, output, 0,
onDiskBlockBytesWithHeader.length);
System.arraycopy(onDiskBlockBytesWithHeader.getBuffer(), 0, output, 0,
onDiskBlockBytesWithHeader.size());
System.arraycopy(onDiskChecksum, 0, output,
onDiskBlockBytesWithHeader.length, onDiskChecksum.length);
onDiskBlockBytesWithHeader.size(), onDiskChecksum.length);
return output;
}
@ -1165,7 +1160,7 @@ public class HFileBlock implements Cacheable {
*/
int getOnDiskSizeWithoutHeader() {
expectState(State.BLOCK_READY);
return onDiskBlockBytesWithHeader.length +
return onDiskBlockBytesWithHeader.size() +
onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
}
@ -1178,7 +1173,7 @@ public class HFileBlock implements Cacheable {
*/
int getOnDiskSizeWithHeader() {
expectState(State.BLOCK_READY);
return onDiskBlockBytesWithHeader.length + onDiskChecksum.length;
return onDiskBlockBytesWithHeader.size() + onDiskChecksum.length;
}
/**
@ -1186,7 +1181,7 @@ public class HFileBlock implements Cacheable {
*/
int getUncompressedSizeWithoutHeader() {
expectState(State.BLOCK_READY);
return uncompressedBlockBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE;
return baosInMemory.size() - HConstants.HFILEBLOCK_HEADER_SIZE;
}
/**
@ -1194,7 +1189,7 @@ public class HFileBlock implements Cacheable {
*/
int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return uncompressedBlockBytesWithHeader.length;
return baosInMemory.size();
}
/** @return true if a block is being written */
@ -1215,29 +1210,37 @@ public class HFileBlock implements Cacheable {
}
/**
* Returns the header followed by the uncompressed data, even if using
* Clones the header followed by the uncompressed data, even if using
* compression. This is needed for storing uncompressed blocks in the block
* cache. Can be called in the "writing" state or the "block ready" state.
* Returns only the header and data, does not include checksum data.
*
* @return uncompressed block bytes for caching on write
* @return Returns a copy of uncompressed block bytes for caching on write
*/
ByteBuffer getUncompressedBufferWithHeader() {
@VisibleForTesting
ByteBuffer cloneUncompressedBufferWithHeader() {
expectState(State.BLOCK_READY);
byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());
putHeader(uncompressedBlockBytesWithHeader, 0,
onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
return ByteBuffer.wrap(uncompressedBlockBytesWithHeader);
}
/**
* Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
* Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is
* needed for storing packed blocks in the block cache. Expects calling semantics identical to
* {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
* Does not include checksum data.
*
* @return packed block bytes for caching on write
* @return Returns a copy of block bytes for caching on write
*/
ByteBuffer getOnDiskBufferWithHeader() {
private ByteBuffer cloneOnDiskBufferWithHeader() {
expectState(State.BLOCK_READY);
return ByteBuffer.wrap(onDiskBlockBytesWithHeader);
return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray());
}
private void expectState(State expectedState) {
@ -1268,7 +1271,9 @@ public class HFileBlock implements Cacheable {
* the byte buffer passed into the constructor of this newly created
* block does not have checksum data even though the header minor
* version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
* 0 value in bytesPerChecksum.
* 0 value in bytesPerChecksum. This method copies the on-disk or
* uncompressed data to build the HFileBlock which is used only
* while writing blocks and caching.
*
* <p>TODO: Should there be an option where a cache can ask that hbase preserve block
* checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible
@ -1289,10 +1294,10 @@ public class HFileBlock implements Cacheable {
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
getUncompressedSizeWithoutHeader(), prevOffset,
cacheConf.shouldCacheCompressed(blockType.getCategory())?
getOnDiskBufferWithHeader() :
getUncompressedBufferWithHeader(),
cloneOnDiskBufferWithHeader() :
cloneUncompressedBufferWithHeader(),
FILL_HEADER, startOffset, UNSET,
onDiskBlockBytesWithHeader.length + onDiskChecksum.length, newContext);
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext);
}
}

View File

@ -390,7 +390,7 @@ public class TestHFileBlock {
writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
hbw.writeHeaderAndData(os);
int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
byte[] encodedResultWithHeader = hbw.getUncompressedBufferWithHeader().array();
byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array();
final int encodedSize = encodedResultWithHeader.length - headerLen;
if (encoding != DataBlockEncoding.NONE) {
// We need to account for the two-byte encoding algorithm ID that
@ -798,7 +798,7 @@ public class TestHFileBlock {
totalSize += hbw.getOnDiskSizeWithHeader();
if (cacheOnWrite)
expectedContents.add(hbw.getUncompressedBufferWithHeader());
expectedContents.add(hbw.cloneUncompressedBufferWithHeader());
if (detailedLogging) {
LOG.info("Written block #" + i + " of type " + bt