HBASE-21657 PrivateCellUtil#estimatedSerializedSizeOf has been the bottleneck in 100% scan case

This commit is contained in:
huzheng 2019-01-07 14:41:18 +08:00
parent 87d4ab8669
commit fdfbd9a21c
27 changed files with 114 additions and 81 deletions

View File

@ -323,7 +323,7 @@ public final class ConnectionUtils {
long estimatedHeapSizeOfResult = 0;
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
estimatedHeapSizeOfResult += PrivateCellUtil.estimatedSizeOfCell(cell);
estimatedHeapSizeOfResult += cell.heapSize();
}
return estimatedHeapSizeOfResult;
}

View File

@ -499,8 +499,8 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
heapsize += ClassSize.align(ClassSize.ARRAY +
size * ClassSize.REFERENCE);
for(Cell cell : entry.getValue()) {
heapsize += PrivateCellUtil.estimatedSizeOfCell(cell);
for (Cell cell : entry.getValue()) {
heapsize += cell.heapSize();
}
}
heapsize += getAttributeSize();

View File

@ -859,7 +859,7 @@ public class Result implements CellScannable, CellScanner {
return size;
}
for (Cell c : result.rawCells()) {
size += PrivateCellUtil.estimatedSizeOfCell(c);
size += c.heapSize();
}
return size;
}

View File

@ -243,6 +243,11 @@ public class KeyOnlyFilter extends FilterBase {
}
}
@Override
public int getSerializedSize() {
return cell.getSerializedSize();
}
@Override
public byte[] getTagsArray() {
return HConstants.EMPTY_BYTE_ARRAY;
@ -257,6 +262,11 @@ public class KeyOnlyFilter extends FilterBase {
public int getTagsLength() {
return 0;
}
@Override
public long heapSize() {
return cell.heapSize();
}
}
static class KeyOnlyByteBufferExtendedCell extends ByteBufferExtendedCell {

View File

@ -75,6 +75,11 @@ public class TestHBaseRpcControllerImpl {
// Fake out a Cell. All this Cell has is a value that is an int in size and equal
// to the above 'index' param serialized as an int.
return new Cell() {
@Override
public long heapSize() {
return 0;
}
private final int i = index;
@Override
@ -164,6 +169,11 @@ public class TestHBaseRpcControllerImpl {
return Bytes.SIZEOF_INT;
}
@Override
public int getSerializedSize() {
return 0;
}
@Override
public int getTagsOffset() {
// unused

View File

@ -286,6 +286,11 @@ public class ByteBufferKeyValue extends ByteBufferExtendedCell {
return getKeyLength() + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
}
@Override
public int getSerializedSize() {
return this.length;
}
@Override
public void write(ByteBuffer buf, int offset) {
ByteBufferUtils.copyFromBufferToBuffer(this.buf, buf, this.offset, offset, this.length);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.yetus.audience.InterfaceAudience;
@ -59,7 +60,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* </p>
*/
@InterfaceAudience.Public
public interface Cell {
public interface Cell extends HeapSize {
//1) Row
@ -171,6 +172,11 @@ public interface Cell {
*/
int getValueLength();
/**
* @return Serialized size (defaults to include tag length if has some tags).
*/
int getSerializedSize();
/**
* Contiguous raw bytes representing tags that may start at any index in the containing array.
* @return the tags byte array

View File

@ -1072,11 +1072,7 @@ public final class CellUtil {
*/
@Deprecated
public static long estimatedHeapSizeOf(final Cell cell) {
if (cell instanceof HeapSize) {
return ((HeapSize) cell).heapSize();
}
// TODO: Add sizing of references that hold the row, family, etc., arrays.
return estimatedSerializedSizeOf(cell);
return cell.heapSize();
}
/********************* tags *************************************/

View File

@ -90,6 +90,7 @@ public interface ExtendedCell extends RawCell, HeapSize {
/**
* @return Serialized size (defaults to include tag length).
*/
@Override
default int getSerializedSize() {
return getSerializedSize(true);
}

View File

@ -2322,6 +2322,11 @@ public class KeyValue implements ExtendedCell, Cloneable {
return this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
}
@Override
public int getSerializedSize() {
return this.length;
}
@Override
public void write(ByteBuffer buf, int offset) {
ByteBufferUtils.copyFromArrayToBuffer(buf, offset, this.bytes, this.offset, this.length);

View File

@ -746,11 +746,14 @@ public class KeyValueUtil {
}
public static int getSerializedSize(Cell cell, boolean withTags) {
if (withTags) {
return cell.getSerializedSize();
}
if (cell instanceof ExtendedCell) {
return ((ExtendedCell) cell).getSerializedSize(withTags);
}
return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
cell.getValueLength(), cell.getTagsLength(), withTags);
cell.getValueLength(), cell.getTagsLength(), withTags);
}
public static int oswrite(final Cell cell, final OutputStream out, final boolean withTags)

View File

@ -31,7 +31,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
@ -250,7 +249,7 @@ public final class PrivateCellUtil {
@Override
public long heapSize() {
long sum = HEAP_SIZE_OVERHEAD + estimatedSizeOfCell(cell);
long sum = HEAP_SIZE_OVERHEAD + cell.heapSize();
if (this.tags != null) {
sum += ClassSize.sizeOf(this.tags);
}
@ -446,7 +445,7 @@ public final class PrivateCellUtil {
@Override
public long heapSize() {
long sum = HEAP_SIZE_OVERHEAD + estimatedSizeOfCell(cell);
long sum = HEAP_SIZE_OVERHEAD + cell.heapSize();
// this.tags is on heap byte[]
if (this.tags != null) {
sum += ClassSize.sizeOf(this.tags);
@ -705,7 +704,7 @@ public final class PrivateCellUtil {
@Override
public ExtendedCell deepClone() {
Cell clonedBaseCell = ((ExtendedCell) this.cell).deepClone();
Cell clonedBaseCell = this.cell.deepClone();
if (clonedBaseCell instanceof ByteBufferExtendedCell) {
return new ValueAndTagRewriteByteBufferExtendedCell(
(ByteBufferExtendedCell) clonedBaseCell, this.value, this.tags);
@ -2737,34 +2736,7 @@ public final class PrivateCellUtil {
* actual cell length.
*/
public static int estimatedSerializedSizeOf(final Cell cell) {
if (cell instanceof ExtendedCell) {
return ((ExtendedCell) cell).getSerializedSize(true) + Bytes.SIZEOF_INT;
}
return getSumOfCellElementLengths(cell) +
// Use the KeyValue's infrastructure size presuming that another implementation would have
// same basic cost.
KeyValue.ROW_LENGTH_SIZE + KeyValue.FAMILY_LENGTH_SIZE +
// Serialization is probably preceded by a length (it is in the KeyValueCodec at least).
Bytes.SIZEOF_INT;
}
/**
* @param cell
* @return Sum of the lengths of all the elements in a Cell; does not count in any infrastructure
*/
private static int getSumOfCellElementLengths(final Cell cell) {
return getSumOfCellKeyElementLengths(cell) + cell.getValueLength() + cell.getTagsLength();
}
/**
* @param cell
* @return Sum of all elements that make up a key; does not include infrastructure, tags or
* values.
*/
private static int getSumOfCellKeyElementLengths(final Cell cell) {
return cell.getRowLength() + cell.getFamilyLength() + cell.getQualifierLength()
+ KeyValue.TIMESTAMP_TYPE_SIZE;
return cell.getSerializedSize() + Bytes.SIZEOF_INT;
}
/**
@ -2778,23 +2750,6 @@ public final class PrivateCellUtil {
+ KeyValue.KEY_INFRASTRUCTURE_SIZE;
}
/**
* This is an estimate of the heap space occupied by a cell. When the cell is of type
* {@link HeapSize} we call {@link HeapSize#heapSize()} so cell can give a correct value. In other
* cases we just consider the bytes occupied by the cell components ie. row, CF, qualifier,
* timestamp, type, value and tags.
* Note that this can be the JVM heap space (on-heap) or the OS heap (off-heap)
* @param cell
* @return estimate of the heap space
*/
public static long estimatedSizeOfCell(final Cell cell) {
if (cell instanceof HeapSize) {
return ((HeapSize) cell).heapSize();
}
// TODO: Add sizing of references that hold the row, family, etc., arrays.
return estimatedSerializedSizeOf(cell);
}
/**
* This method exists just to encapsulate how we serialize keys. To be replaced by a factory that
* we query to figure what the Cell implementation is and then, what serialization engine to use

View File

@ -62,4 +62,14 @@ public class SizeCachedKeyValue extends KeyValue {
public long heapSize() {
return super.heapSize() + FIXED_OVERHEAD;
}
/**
* Override by just returning the length for saving cost of method dispatching. If not, it will
* call {@link ExtendedCell#getSerializedSize()} firstly, then forward to
* {@link SizeCachedKeyValue#getSerializedSize(boolean)}. (See HBASE-21657)
*/
@Override
public int getSerializedSize() {
return this.length;
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.SizeCachedKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
@ -383,7 +384,9 @@ public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
currentBuffer.asSubByteBuffer(startOffset, cellBufSize, tmpPair);
ByteBuffer buf = tmpPair.getFirst();
if (buf.isDirect()) {
ret = new ByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId);
ret =
tagsLength > 0 ? new ByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId)
: new NoTagsByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId);
} else {
if (tagsLength > 0) {
ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset()

View File

@ -179,6 +179,11 @@ public class TestCellUtil {
return 0;
}
@Override
public int getSerializedSize() {
return 0;
}
@Override
public byte[] getTagsArray() {
// TODO Auto-generated method stub
@ -202,6 +207,11 @@ public class TestCellUtil {
// TODO Auto-generated method stub
return 0;
}
@Override
public long heapSize() {
return 0;
}
}
/**
@ -630,6 +640,11 @@ public class TestCellUtil {
return this.kv.getValueLength();
}
@Override
public int getSerializedSize() {
return this.kv.getSerializedSize();
}
@Override
public byte[] getTagsArray() {
return this.kv.getTagsArray();
@ -644,5 +659,10 @@ public class TestCellUtil {
public int getTagsLength() {
return this.kv.getTagsLength();
}
@Override
public long heapSize() {
return this.kv.heapSize();
}
}
}

View File

@ -241,7 +241,7 @@ public class MapReduceExtendedCell extends ByteBufferExtendedCell {
@Override
public long heapSize() {
return PrivateCellUtil.estimatedSizeOfCell(cell);
return cell.heapSize();
}
@Override

View File

@ -266,7 +266,7 @@ public class HFileBlockIndex {
// Adding blockKeys
for (Cell key : blockKeys) {
heapSize += ClassSize.align(PrivateCellUtil.estimatedSizeOfCell(key));
heapSize += ClassSize.align(key.heapSize());
}
}
// Add comparator and the midkey atomicreference

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
@ -967,7 +968,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
} else {
ByteBuffer buf = blockBuffer.asSubByteBuffer(cellBufSize);
if (buf.isDirect()) {
ret = new ByteBufferKeyValue(buf, buf.position(), cellBufSize, seqId);
ret = currTagsLen > 0 ? new ByteBufferKeyValue(buf, buf.position(), cellBufSize, seqId)
: new NoTagsByteBufferKeyValue(buf, buf.position(), cellBufSize, seqId);
} else {
if (currTagsLen > 0) {
ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset() + buf.position(),

View File

@ -6771,7 +6771,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
PrivateCellUtil.estimatedSizeOfCell(cell));
cell.heapSize());
}
}

View File

@ -1365,6 +1365,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* Method to account for the size of retained cells and retained data blocks.
* @param context rpc call context
* @param r result to add size.
* @param lastBlock last block to check whether we need to add the block size in context.
* @return an object that represents the last referenced block from this response.
*/
Object addSize(RpcCallContext context, Result r, Object lastBlock) {
@ -3084,7 +3087,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// return whether we have more results in region.
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context)
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context)
throws IOException {
HRegion region = rsh.r;
RegionScanner scanner = rsh.s;
@ -3214,10 +3217,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
if (limitReached || !moreRows) {
if (LOG.isTraceEnabled()) {
LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows
+ " scannerContext: " + scannerContext);
}
// We only want to mark a ScanResponse as a heartbeat message in the event that
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
@ -3390,7 +3389,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
MutableObject<Object> lastBlock = new MutableObject<>();
boolean scannerClosed = false;
try {
List<Result> results = new ArrayList<>();
List<Result> results = new ArrayList<>(Math.min(rows, 512));
if (rows > 0) {
boolean done = false;
// Call coprocessor. Get region info from scanner.

View File

@ -29,7 +29,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.TimeRange;
@ -369,7 +368,7 @@ public abstract class Segment implements MemStoreSizing {
}
res += indexEntryOnHeapSize(onHeap);
if(onHeap) {
res += PrivateCellUtil.estimatedSizeOfCell(cell);
res += cell.heapSize();
}
res = ClassSize.align(res);
}
@ -386,7 +385,7 @@ public abstract class Segment implements MemStoreSizing {
}
res += indexEntryOffHeapSize(offHeap);
if(offHeap) {
res += PrivateCellUtil.estimatedSizeOfCell(cell);
res += cell.heapSize();
}
res = ClassSize.align(res);
}

View File

@ -622,8 +622,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
totalBytesRead += cellSize;
// Update the progress of the scanner context
scannerContext.incrementSizeProgress(cellSize,
PrivateCellUtil.estimatedSizeOfCell(cell));
scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
scannerContext.incrementBatchProgress(1);
if (matcher.isUserScan() && totalBytesRead > maxRowSize) {

View File

@ -218,7 +218,7 @@ public class WALEdit implements HeapSize {
public long heapSize() {
long ret = ClassSize.ARRAYLIST;
for (Cell cell : cells) {
ret += PrivateCellUtil.estimatedSizeOfCell(cell);
ret += cell.heapSize();
}
return ret;
}

View File

@ -308,7 +308,7 @@ public class WALPrettyPrinter {
if (row == null || ((String) op.get("row")).equals(row)) {
actions.add(op);
}
op.put("total_size_sum", PrivateCellUtil.estimatedSizeOfCell(cell));
op.put("total_size_sum", cell.heapSize());
}
if (actions.isEmpty())
continue;

View File

@ -391,8 +391,7 @@ public class TestPartialResultsFromClientSide {
// Estimate the cell heap size. One difference is that on server side, the KV Heap size is
// estimated differently in case the cell is backed up by MSLAB byte[] (no overhead for
// backing array). Thus below calculation is a bit brittle.
CELL_HEAP_SIZE = PrivateCellUtil.estimatedSizeOfCell(result.rawCells()[0])
- (ClassSize.ARRAY+3);
CELL_HEAP_SIZE = result.rawCells()[0].heapSize() - (ClassSize.ARRAY + 3);
if (LOG.isInfoEnabled()) LOG.info("Cell heap size: " + CELL_HEAP_SIZE);
scanner.close();
}

View File

@ -155,7 +155,7 @@ public class TestServerSideScanMetricsFromClientSide {
assertTrue(result.rawCells() != null);
assertTrue(result.rawCells().length == 1);
CELL_HEAP_SIZE = PrivateCellUtil.estimatedSizeOfCell(result.rawCells()[0]);
CELL_HEAP_SIZE = result.rawCells()[0].heapSize();
scanner.close();
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
@ -213,6 +214,11 @@ public class TestPassCustomCellViaRegionObserver {
Cell.Type type, byte[] value) {
return new Cell() {
@Override
public long heapSize() {
return 0;
}
private byte[] getArray(byte[] array) {
return array == null ? HConstants.EMPTY_BYTE_ARRAY : array;
}
@ -296,6 +302,11 @@ public class TestPassCustomCellViaRegionObserver {
return length(value);
}
@Override
public int getSerializedSize() {
return KeyValueUtil.getSerializedSize(this, true);
}
@Override
public byte[] getTagsArray() {
return getArray(null);