HBASE-27264 Add options to consider compressed size when delimiting blocks during hfile writes (#4675)

Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
Signed-off-by: Ankit Singhal <ankit@apache.org>
This commit is contained in:
Wellington Ramos Chevreuil 2022-08-15 22:35:35 +01:00 committed by GitHub
parent 8ec02c025e
commit eaa47c5cd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 283 additions and 6 deletions

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Allows for defining different compression rate predicates on its implementing classes. Useful
* when compression is in place, and we want to define block size based on the compressed size,
* rather than the default behaviour that considers the uncompressed size only. Since we don't
* actually know the compressed size until we actual apply compression in the block byte buffer, we
* need to "predicate" this compression rate and minimize compression execution to avoid excessive
* resources usage. Different approaches for predicating the compressed block size can be defined by
* implementing classes. The <code>updateLatestBlockSizes</code> allows for updating uncompressed
* and compressed size values, and is called during block finishing (when we finally apply
* compression on the block data). Final block size predicate logic is implemented in
* <code>shouldFinishBlock</code>, which is called by the block writer once uncompressed size has
* reached the configured BLOCK size, and additional checks should be applied to decide if the block
* can be finished.
*/
@InterfaceAudience.Private
public interface BlockCompressedSizePredicator {
String BLOCK_COMPRESSED_SIZE_PREDICATOR = "hbase.block.compressed.size.predicator";
String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed";
/**
* Updates the predicator with both compressed and uncompressed sizes of latest block written. To
* be called once the block is finshed and flushed to disk after compression.
* @param context the HFileContext containg the configured max block size.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed);
/**
* Decides if the block should be finished based on the comparison of its uncompressed size
* against an adjusted size based on a predicated compression factor.
* @param uncompressed true if the block should be finished. n
*/
boolean shouldFinishBlock(int uncompressed);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
import io.opentelemetry.api.common.Attributes;
@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -463,7 +465,7 @@ public class HFileBlock implements Cacheable {
}
/** Returns the uncompressed size of data part (header and checksum excluded). */
int getUncompressedSizeWithoutHeader() {
public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
}
@ -740,6 +742,10 @@ public class HFileBlock implements Cacheable {
BLOCK_READY
}
private int maxSizeUnCompressed;
private BlockCompressedSizePredicator compressedSizePredicator;
/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;
@ -818,11 +824,11 @@ public class HFileBlock implements Cacheable {
*/
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext) {
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize());
}
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext, ByteBuffAllocator allocator) {
HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
+ HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
@ -845,6 +851,10 @@ public class HFileBlock implements Cacheable {
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
// defaultDataBlockEncoder?
this.fileContext = fileContext;
this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance(
conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class),
new Configuration(conf));
this.maxSizeUnCompressed = maxSizeUnCompressed;
}
/**
@ -897,6 +907,15 @@ public class HFileBlock implements Cacheable {
finishBlock();
}
public boolean checkBoundariesWithPredicate() {
int rawBlockSize = encodedBlockSizeWritten();
if (rawBlockSize >= maxSizeUnCompressed) {
return true;
} else {
return compressedSizePredicator.shouldFinishBlock(rawBlockSize);
}
}
/**
* Finish up writing of the block. Flushes the compressing stream (if using compression), fills
* out the header, does any compression/encryption of bytes to flush out to disk, and manages
@ -911,6 +930,11 @@ public class HFileBlock implements Cacheable {
userDataStream.flush();
prevOffset = prevOffsetByType[blockType.getId()];
// We need to cache the unencoded/uncompressed size before changing the block state
int rawBlockSize = 0;
if (this.getEncodingState() != null) {
rawBlockSize = blockSizeWritten();
}
// We need to set state before we can package the block up for cache-on-write. In a way, the
// block is ready, but not yet encoded or compressed.
state = State.BLOCK_READY;
@ -931,6 +955,10 @@ public class HFileBlock implements Cacheable {
onDiskBlockBytesWithHeader.reset();
onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
// Update raw and compressed sizes in the predicate
compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize,
onDiskBlockBytesWithHeader.size());
// Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());
@ -938,6 +966,7 @@ public class HFileBlock implements Cacheable {
// Put the header for the on disk bytes; header currently is unfilled-out
putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
if (onDiskChecksum.length != numBytes) {
onDiskChecksum = new byte[numBytes];
}
@ -1077,7 +1106,7 @@ public class HFileBlock implements Cacheable {
/**
* The uncompressed size of the block data, including header size.
*/
int getUncompressedSizeWithHeader() {
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return baosInMemory.size();
}
@ -1101,7 +1130,7 @@ public class HFileBlock implements Cacheable {
* block at the moment. Note that this will return zero in the "block ready" state as well.
* @return the number of bytes written
*/
int blockSizeWritten() {
public int blockSizeWritten() {
return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@ -292,7 +294,8 @@ public class HFileWriterImpl implements HFile.Writer {
throw new IllegalStateException("finishInit called twice");
}
blockWriter =
new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator());
new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
@ -319,6 +322,7 @@ public class HFileWriterImpl implements HFile.Writer {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
}
shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
if (shouldFinishBlock) {
finishBlock();
writeInlineBlocks(false);

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This BlockCompressedSizePredicator implementation adjusts the block size limit based on the
* compression rate of the block contents read so far. For the first block, adjusted size would be
* zero, so it performs a compression of current block contents and calculate compression rate and
* adjusted size. For subsequent blocks, decision whether the block should be finished or not will
* be based on the compression rate calculated for the previous block.
*/
@InterfaceAudience.Private
public class PreviousBlockCompressionRatePredicator implements BlockCompressedSizePredicator {
private int adjustedBlockSize;
private int compressionRatio = 1;
private int configuredMaxBlockSize;
/**
* Recalculates compression rate for the last block and adjusts the block size limit as:
* BLOCK_SIZE * (uncompressed/compressed).
* @param context HFIleContext containing the configured max block size.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
@Override
public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) {
configuredMaxBlockSize = context.getBlocksize();
compressionRatio = uncompressed / compressed;
adjustedBlockSize = context.getBlocksize() * compressionRatio;
}
/**
* Returns <b>true</b> if the passed uncompressed size is larger than the limit calculated by
* <code>updateLatestBlockSizes</code>.
* @param uncompressed true if the block should be finished. n
*/
@Override
public boolean shouldFinishBlock(int uncompressed) {
if (uncompressed >= configuredMaxBlockSize) {
return uncompressed >= adjustedBlockSize;
}
return false;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This BlockCompressedSizePredicator implementation doesn't actually performs any predicate and
* simply returns <b>true</b> on <code>shouldFinishBlock</code>. This is the default implementation
* if <b>hbase.block.compressed.size.predicator</b> property is not defined.
*/
@InterfaceAudience.Private
public class UncompressedBlockSizePredicator implements BlockCompressedSizePredicator {
/**
* Empty implementation. Does nothing.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
@Override
public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) {
}
/**
* Dummy implementation that always returns true. This means, we will be only considering the
* block uncompressed size for deciding when to finish a block.
* @param uncompressed true if the block should be finished. n
*/
@Override
public boolean shouldFinishBlock(int uncompressed) {
return true;
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -39,6 +40,7 @@ import java.util.Map;
import java.util.OptionalLong;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
@ -74,8 +77,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -189,6 +194,24 @@ public class TestHStoreFile {
}
}
public static void writeLargeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier,
int rounds) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
try {
for (int i = 0; i < rounds; i++) {
for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
byte[] b = new byte[] { (byte) d, (byte) e };
byte[] key = new byte[] { (byte) i };
writer.append(new KeyValue(key, fam, qualifier, now, b));
}
}
}
} finally {
writer.close();
}
}
/**
* Test that our mechanism of writing store files in one region to reference store files in other
* regions works.
@ -1193,4 +1216,55 @@ public class TestHStoreFile {
}
}
@Test
public void testDataBlockSizeCompressed() throws Exception {
conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR,
PreviousBlockCompressionRatePredicator.class.getName());
testDataBlockSizeWithCompressionRatePredicator(11,
(s, c) -> (c > 1 && c < 11) ? s >= BLOCKSIZE_SMALL * 10 : true);
}
@Test
public void testDataBlockSizeUnCompressed() throws Exception {
conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class.getName());
testDataBlockSizeWithCompressionRatePredicator(200, (s, c) -> s < BLOCKSIZE_SMALL * 10);
}
private void testDataBlockSizeWithCompressionRatePredicator(int expectedBlockCount,
BiFunction<Integer, Integer, Boolean> validation) throws Exception {
Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
Path path = new Path(dir, "1234567890");
DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF;
cacheConf = new CacheConfig(conf);
HFileContext meta =
new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES).withDataBlockEncoding(dataBlockEncoderAlgo)
.withCompression(Compression.Algorithm.GZ).build();
// Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(path).withMaxKeyCount(2000).withFileContext(meta).build();
writeLargeStoreFile(writer, Bytes.toBytes(name.getMethodName()),
Bytes.toBytes(name.getMethodName()), 200);
writer.close();
HStoreFile storeFile =
new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
storeFile.initReader();
HFile.Reader fReader =
HFile.createReader(fs, writer.getPath(), storeFile.getCacheConf(), true, conf);
FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, writer.getPath());
long fileSize = fs.getFileStatus(writer.getPath()).getLen();
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset();
HFileBlock block;
int blockCount = 0;
while (offset <= max) {
block = fReader.readBlock(offset, -1, /* cacheBlock */ false, /* pread */ false,
/* isCompaction */ false, /* updateCacheMetrics */ false, null, null);
offset += block.getOnDiskSizeWithHeader();
blockCount++;
assertTrue(validation.apply(block.getUncompressedSizeWithoutHeader(), blockCount));
}
assertEquals(expectedBlockCount, blockCount);
}
}