HBASE-23788 ROW_INDEX_V1 encoder should consider the secondary index size with the encoded data size tracking (#1219)
Signed-off-by Anoop Sam John <anoopsamjohn@apache.org>
This commit is contained in:
parent
6d9802fc2e
commit
b6eefcaeb7
|
@ -1122,21 +1122,16 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
|
|||
}
|
||||
}
|
||||
StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
|
||||
blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
|
||||
}
|
||||
|
||||
private static class BufferedDataBlockEncodingState extends EncodingState {
|
||||
int unencodedDataSizeWritten = 0;
|
||||
blkEncodingCtx.setEncodingState(new EncodingState());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException {
|
||||
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
|
||||
.getEncodingState();
|
||||
EncodingState state = encodingCtx.getEncodingState();
|
||||
int posBeforeEncode = out.size();
|
||||
int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
|
||||
state.unencodedDataSizeWritten += encodedKvSize;
|
||||
return encodedKvSize;
|
||||
state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode);
|
||||
}
|
||||
|
||||
public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
|
||||
|
@ -1145,12 +1140,11 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
|
|||
@Override
|
||||
public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
|
||||
byte[] uncompressedBytesWithHeader) throws IOException {
|
||||
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
|
||||
.getEncodingState();
|
||||
EncodingState state = encodingCtx.getEncodingState();
|
||||
// Write the unencodedDataSizeWritten (with header size)
|
||||
Bytes.putInt(uncompressedBytesWithHeader,
|
||||
HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
|
||||
state.unencodedDataSizeWritten);
|
||||
state.getUnencodedDataSizeWritten());
|
||||
postEncoding(encodingCtx);
|
||||
}
|
||||
|
||||
|
|
|
@ -50,9 +50,10 @@ public interface DataBlockEncoder {
|
|||
|
||||
/**
|
||||
* Encodes a KeyValue.
|
||||
* @return unencoded kv size written
|
||||
* After the encode, {@link EncodingState#postCellEncode(int, int)} needs to be called to keep
|
||||
* track of the encoded and unencoded data size
|
||||
*/
|
||||
int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,6 +32,14 @@ public class EncodingState {
|
|||
*/
|
||||
protected Cell prevCell = null;
|
||||
|
||||
// Size of actual data being written. Not considering the block encoding/compression. This
|
||||
// includes the header size also.
|
||||
protected int unencodedDataSizeWritten = 0;
|
||||
|
||||
// Size of actual data being written. considering the block encoding. This
|
||||
// includes the header size also.
|
||||
protected int encodedDataSizeWritten = 0;
|
||||
|
||||
public void beforeShipped() {
|
||||
if (this.prevCell != null) {
|
||||
// can't use KeyValueUtil#toNewKeyCell, because we need both key and value
|
||||
|
@ -39,4 +47,17 @@ public class EncodingState {
|
|||
this.prevCell = KeyValueUtil.copyToNewKeyValue(this.prevCell);
|
||||
}
|
||||
}
|
||||
|
||||
public void postCellEncode(int unencodedCellSizeWritten, int encodedCellSizeWritten) {
|
||||
this.unencodedDataSizeWritten += unencodedCellSizeWritten;
|
||||
this.encodedDataSizeWritten += encodedCellSizeWritten;
|
||||
}
|
||||
|
||||
public int getUnencodedDataSizeWritten() {
|
||||
return unencodedDataSizeWritten;
|
||||
}
|
||||
|
||||
public int getEncodedDataSizeWritten() {
|
||||
return encodedDataSizeWritten;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,12 +79,12 @@ public class RowIndexCodecV1 extends AbstractDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx,
|
||||
public void encode(Cell cell, HFileBlockEncodingContext encodingCtx,
|
||||
DataOutputStream out) throws IOException {
|
||||
RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
|
||||
.getEncodingState();
|
||||
RowIndexEncoderV1 encoder = state.encoder;
|
||||
return encoder.write(cell);
|
||||
encoder.write(cell);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -15,6 +15,7 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -38,16 +39,21 @@ public class RowIndexEncoderV1 {
|
|||
this.context = encodingCtx;
|
||||
}
|
||||
|
||||
public int write(Cell cell) throws IOException {
|
||||
public void write(Cell cell) throws IOException {
|
||||
// checkRow uses comparator to check we are writing in order.
|
||||
int extraBytesForRowIndex = 0;
|
||||
|
||||
if (!checkRow(cell)) {
|
||||
if (startOffset < 0) {
|
||||
startOffset = out.size();
|
||||
}
|
||||
rowsOffsetBAOS.writeInt(out.size() - startOffset);
|
||||
// added for the int written in the previous line
|
||||
extraBytesForRowIndex = Bytes.SIZEOF_INT;
|
||||
}
|
||||
lastCell = cell;
|
||||
return encoder.write(cell);
|
||||
int size = encoder.write(cell);
|
||||
context.getEncodingState().postCellEncode(size, size + extraBytesForRowIndex);
|
||||
}
|
||||
|
||||
protected boolean checkRow(final Cell cell) throws IOException {
|
||||
|
|
|
@ -798,14 +798,6 @@ public class HFileBlock implements Cacheable {
|
|||
*/
|
||||
private DataOutputStream userDataStream;
|
||||
|
||||
// Size of actual data being written. Not considering the block encoding/compression. This
|
||||
// includes the header size also.
|
||||
private int unencodedDataSizeWritten;
|
||||
|
||||
// Size of actual data being written. considering the block encoding. This
|
||||
// includes the header size also.
|
||||
private int encodedDataSizeWritten;
|
||||
|
||||
/**
|
||||
* Bytes to be written to the file system, including the header. Compressed
|
||||
* if compression is turned on. It also includes the checksum data that
|
||||
|
@ -911,8 +903,6 @@ public class HFileBlock implements Cacheable {
|
|||
if (newBlockType == BlockType.DATA) {
|
||||
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
|
||||
}
|
||||
this.unencodedDataSizeWritten = 0;
|
||||
this.encodedDataSizeWritten = 0;
|
||||
return userDataStream;
|
||||
}
|
||||
|
||||
|
@ -921,10 +911,7 @@ public class HFileBlock implements Cacheable {
|
|||
*/
|
||||
void write(Cell cell) throws IOException{
|
||||
expectState(State.WRITING);
|
||||
int posBeforeEncode = this.userDataStream.size();
|
||||
this.unencodedDataSizeWritten +=
|
||||
this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
|
||||
this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode;
|
||||
this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1155,7 +1142,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @return the number of bytes written
|
||||
*/
|
||||
public int encodedBlockSizeWritten() {
|
||||
return state != State.WRITING ? 0 : this.encodedDataSizeWritten;
|
||||
return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1166,7 +1153,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @return the number of bytes written
|
||||
*/
|
||||
int blockSizeWritten() {
|
||||
return state != State.WRITING ? 0 : this.unencodedDataSizeWritten;
|
||||
return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -52,10 +52,9 @@ public interface HFileDataBlockEncoder {
|
|||
* @param cell
|
||||
* @param encodingCtx
|
||||
* @param out
|
||||
* @return unencoded kv size
|
||||
* @throws IOException
|
||||
*/
|
||||
int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -91,9 +91,9 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
|
||||
throws IOException {
|
||||
return this.encoding.getEncoder().encode(cell, encodingCtx, out);
|
||||
this.encoding.getEncoder().encode(cell, encodingCtx, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -47,12 +47,13 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx,
|
||||
public void encode(Cell cell, HFileBlockEncodingContext encodingCtx,
|
||||
DataOutputStream out) throws IOException {
|
||||
NoneEncodingState state = (NoneEncodingState) encodingCtx
|
||||
.getEncodingState();
|
||||
NoneEncoder encoder = state.encoder;
|
||||
return encoder.write(cell);
|
||||
int size = encoder.write(cell);
|
||||
state.postCellEncode(size, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,7 +100,8 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
|
|||
+ "encoding context.");
|
||||
}
|
||||
|
||||
HFileBlockDefaultEncodingContext encodingCtx = (HFileBlockDefaultEncodingContext) blkEncodingCtx;
|
||||
HFileBlockDefaultEncodingContext encodingCtx =
|
||||
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
|
||||
encodingCtx.prepareEncoding(out);
|
||||
|
||||
NoneEncoder encoder = new NoneEncoder(out, encodingCtx);
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ IOTests.class, MediumTests.class })
|
||||
public class TestRowIndexV1DataEncoder {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRowIndexV1DataEncoder.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private Configuration conf;
|
||||
private FileSystem fs;
|
||||
private DataBlockEncoding dataBlockEncoding;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
fs = FileSystem.get(conf);
|
||||
dataBlockEncoding = DataBlockEncoding.ROW_INDEX_V1;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockCountWritten() throws IOException {
|
||||
Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3");
|
||||
final int entryCount = 10000;
|
||||
writeDataToHFile(hfilePath, entryCount);
|
||||
}
|
||||
|
||||
private void writeDataToHFile(Path hfilePath, int entryCount) throws IOException {
|
||||
HFileContext context =
|
||||
new HFileContextBuilder().withBlockSize(1024).withDataBlockEncoding(dataBlockEncoding)
|
||||
.withCellComparator(CellComparatorImpl.COMPARATOR).build();
|
||||
CacheConfig cacheConfig = new CacheConfig(conf);
|
||||
HFile.Writer writer =
|
||||
new HFile.WriterFactory(conf, cacheConfig).withPath(fs, hfilePath).withFileContext(context)
|
||||
.create();
|
||||
|
||||
List<KeyValue> keyValues = new ArrayList<>(entryCount);
|
||||
|
||||
writeKeyValues(entryCount, writer, keyValues);
|
||||
|
||||
FSDataInputStream fsdis = fs.open(hfilePath);
|
||||
|
||||
long fileSize = fs.getFileStatus(hfilePath).getLen();
|
||||
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize);
|
||||
|
||||
// HBASE-23788
|
||||
// kv size = 24 bytes, block size = 1024 bytes
|
||||
// per row encoded data written = (4 (Row index) + 24 (Cell size) + 1 (MVCC)) bytes = 29 bytes
|
||||
// creating block size of (29 * 36) bytes = 1044 bytes
|
||||
// Number of blocks = ceil((29 * 10000) / 1044) = 278
|
||||
// Without the patch it would have produced 244 blocks (each block of 1236 bytes)
|
||||
// Earlier this would create blocks ~20% greater than the block size of 1024 bytes
|
||||
// After this patch actual block size is ~2% greater than the block size of 1024 bytes
|
||||
Assert.assertEquals(278, trailer.getDataIndexCount());
|
||||
}
|
||||
|
||||
private void writeKeyValues(int entryCount, HFile.Writer writer, List<KeyValue> keyValues)
|
||||
throws IOException {
|
||||
for (int i = 0; i < entryCount; ++i) {
|
||||
byte[] keyBytes = intToBytes(i);
|
||||
|
||||
byte[] valueBytes = new byte[0];
|
||||
KeyValue keyValue = new KeyValue(keyBytes, null, null, valueBytes);
|
||||
|
||||
writer.append(keyValue);
|
||||
keyValues.add(keyValue);
|
||||
}
|
||||
writer.close();
|
||||
}
|
||||
|
||||
private byte[] intToBytes(final int i) {
|
||||
ByteBuffer bb = ByteBuffer.allocate(4);
|
||||
bb.putInt(i);
|
||||
return bb.array();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue