HBASE-27232 Fix checking for encoded block size when deciding if bloc… (#4640)
Signed-off-by: Andor Molnár <andor@cloudera.com>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Signed-off-by: Ankit Singhal <ankit@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
(cherry picked from commit d5ed8f59e5
)
This commit is contained in:
parent
eef5edb403
commit
80c33e0d46
|
@ -172,8 +172,9 @@ public class HFileWriterImpl implements HFile.Writer {
|
||||||
}
|
}
|
||||||
closeOutputStream = path != null;
|
closeOutputStream = path != null;
|
||||||
this.cacheConf = cacheConf;
|
this.cacheConf = cacheConf;
|
||||||
float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 1f);
|
float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
|
||||||
this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);
|
this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);
|
||||||
|
|
||||||
finishInit(conf);
|
finishInit(conf);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
|
LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
|
||||||
|
@ -309,12 +310,16 @@ public class HFileWriterImpl implements HFile.Writer {
|
||||||
* At a block boundary, write all the inline blocks and opens new block.
|
* At a block boundary, write all the inline blocks and opens new block.
|
||||||
*/
|
*/
|
||||||
protected void checkBlockBoundary() throws IOException {
|
protected void checkBlockBoundary() throws IOException {
|
||||||
// For encoder like prefixTree, encoded size is not available, so we have to compare both
|
boolean shouldFinishBlock = false;
|
||||||
// encoded size and unencoded size to blocksize limit.
|
// This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0
|
||||||
if (
|
// and we should use the encoding ratio
|
||||||
blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit
|
if (encodedBlockSizeLimit > 0) {
|
||||||
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize()
|
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit;
|
||||||
) {
|
} else {
|
||||||
|
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|
||||||
|
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
|
||||||
|
}
|
||||||
|
if (shouldFinishBlock) {
|
||||||
finishBlock();
|
finishBlock();
|
||||||
writeInlineBlocks(false);
|
writeInlineBlocks(false);
|
||||||
newBlock();
|
newBlock();
|
||||||
|
|
|
@ -67,6 +67,9 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||||
|
@ -1137,4 +1140,53 @@ public class TestHStoreFile {
|
||||||
byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
|
byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
|
||||||
assertArrayEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
|
assertArrayEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDataBlockSizeEncoded() throws Exception {
|
||||||
|
// Make up a directory hierarchy that has a regiondir ("7e0102") and familyname.
|
||||||
|
Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
|
||||||
|
Path path = new Path(dir, "1234567890");
|
||||||
|
|
||||||
|
DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF;
|
||||||
|
|
||||||
|
conf.setDouble("hbase.writer.unified.encoded.blocksize.ratio", 1);
|
||||||
|
|
||||||
|
cacheConf = new CacheConfig(conf);
|
||||||
|
HFileContext meta =
|
||||||
|
new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE)
|
||||||
|
.withBytesPerCheckSum(CKBYTES).withDataBlockEncoding(dataBlockEncoderAlgo).build();
|
||||||
|
// Make a store file and write data to it.
|
||||||
|
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
|
||||||
|
.withFilePath(path).withMaxKeyCount(2000).withFileContext(meta).build();
|
||||||
|
writeStoreFile(writer);
|
||||||
|
|
||||||
|
HStoreFile storeFile =
|
||||||
|
new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
|
||||||
|
storeFile.initReader();
|
||||||
|
StoreFileReader reader = storeFile.getReader();
|
||||||
|
|
||||||
|
Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
|
||||||
|
byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING);
|
||||||
|
assertEquals(dataBlockEncoderAlgo.name(), Bytes.toString(value));
|
||||||
|
|
||||||
|
HFile.Reader fReader =
|
||||||
|
HFile.createReader(fs, writer.getPath(), storeFile.getCacheConf(), true, conf);
|
||||||
|
|
||||||
|
FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, writer.getPath());
|
||||||
|
long fileSize = fs.getFileStatus(writer.getPath()).getLen();
|
||||||
|
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
|
||||||
|
long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset();
|
||||||
|
HFileBlock block;
|
||||||
|
while (offset <= max) {
|
||||||
|
block = fReader.readBlock(offset, -1, /* cacheBlock */
|
||||||
|
false, /* pread */ false, /* isCompaction */ false, /* updateCacheMetrics */
|
||||||
|
false, null, null);
|
||||||
|
offset += block.getOnDiskSizeWithHeader();
|
||||||
|
double diff = block.getOnDiskSizeWithHeader() - BLOCKSIZE_SMALL;
|
||||||
|
if (offset <= max) {
|
||||||
|
assertTrue(diff >= 0 && diff < (BLOCKSIZE_SMALL * 0.05));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue