HBASE-17647 OffheapKeyValue#heapSize() implementation is wrong.

This commit is contained in:
anoopsamjohn 2017-02-23 11:29:10 +05:30
parent 335cde3415
commit ff045cab84
9 changed files with 81 additions and 44 deletions

View File

@ -252,7 +252,10 @@ public class ByteBufferKeyValue extends ByteBufferCell implements ExtendedCell {
@Override
public long heapSize() {
return ClassSize.align(FIXED_OVERHEAD + ClassSize.align(length));
if (this.buf.hasArray()) {
return ClassSize.align(FIXED_OVERHEAD + length);
}
return ClassSize.align(FIXED_OVERHEAD);
}
@Override

View File

@ -772,6 +772,7 @@ public final class CellUtil {
@Override
public long heapSize() {
long sum = HEAP_SIZE_OVERHEAD + CellUtil.estimatedHeapSizeOf(cell);
// this.tags is on heap byte[]
if (this.tags != null) {
sum += ClassSize.sizeOf(this.tags, this.tags.length);
}

View File

@ -667,7 +667,7 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
@Override
public long heapSize() {
return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
return FIXED_OVERHEAD;
}
@Override

View File

@ -569,7 +569,7 @@ public class PrefixTreeSeeker implements EncodedSeeker {
@Override
public long heapSize() {
return FIXED_OVERHEAD + rowLength + famLength + qualLength + valLength + tagsLength;
return FIXED_OVERHEAD;
}
@Override

View File

@ -6082,7 +6082,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// may need to be reset a few times if rows are being filtered out so we save the initial
// progress.
int initialBatchProgress = scannerContext.getBatchProgress();
long initialSizeProgress = scannerContext.getSizeProgress();
long initialSizeProgress = scannerContext.getDataSizeProgress();
long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
long initialTimeProgress = scannerContext.getTimeProgress();
// The loop here is used only when at some point during the next we determine
@ -6096,7 +6097,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation.
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialTimeProgress);
initialHeapSizeProgress, initialTimeProgress);
} else {
scannerContext.clearProgress();
}
@ -6198,14 +6199,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long timeProgress = scannerContext.getTimeProgress();
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialTimeProgress);
initialHeapSizeProgress, initialTimeProgress);
} else {
scannerContext.clearProgress();
}
scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
scannerContext.incrementSizeProgress(CellUtil.estimatedSerializedSizeOf(cell),
CellUtil.estimatedHeapSizeOf(cell));
}
}

View File

@ -63,7 +63,7 @@ public class NoLimitScannerContext extends ScannerContext {
}
@Override
void setSizeProgress(long sizeProgress) {
void setSizeProgress(long sizeProgress, long heapSizeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
@ -78,7 +78,7 @@ public class NoLimitScannerContext extends ScannerContext {
}
@Override
void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}

View File

@ -1181,7 +1181,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Object addSize(RpcCallContext context, Result r, Object lastBlock) {
if (context != null && r != null && !r.isEmpty()) {
for (Cell c : r.rawCells()) {
context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(c));
// Since byte buffers can point all kinds of crazy places it's harder to keep track
// of which blocks are kept alive by what byte buffer.
@ -2835,7 +2835,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Configure with limits for this RPC. Set keep progress true since size progress
// towards size limit should be kept between calls to nextRaw
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
contextBuilder.setSizeLimit(sizeScope, maxResultSize);
// maxResultSize - either we can reach this much size for all cells(being read) data or sum
// of heap size occupied by cells(being read). Cell data means its key and value parts.
contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);

View File

@ -104,7 +104,7 @@ public class ScannerContext {
if (limitsToCopy != null) this.limits.copy(limitsToCopy);
// Progress fields are initialized to 0
progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0);
this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE;
@ -150,9 +150,11 @@ public class ScannerContext {
/**
* Progress towards the size limit has been made. Increment internal tracking of size progress
*/
void incrementSizeProgress(long size) {
long currentSize = progress.getSize();
progress.setSize(currentSize + size);
void incrementSizeProgress(long dataSize, long heapSize) {
long curDataSize = progress.getDataSize();
progress.setDataSize(curDataSize + dataSize);
long curHeapSize = progress.getHeapSize();
progress.setHeapSize(curHeapSize + heapSize);
}
/**
@ -166,22 +168,27 @@ public class ScannerContext {
return progress.getBatch();
}
long getSizeProgress() {
return progress.getSize();
long getDataSizeProgress() {
return progress.getDataSize();
}
long getHeapSizeProgress() {
return progress.getHeapSize();
}
long getTimeProgress() {
return progress.getTime();
}
void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
setBatchProgress(batchProgress);
setSizeProgress(sizeProgress);
setSizeProgress(sizeProgress, heapSizeProgress);
setTimeProgress(timeProgress);
}
void setSizeProgress(long sizeProgress) {
progress.setSize(sizeProgress);
void setSizeProgress(long dataSizeProgress, long heapSizeProgress) {
progress.setDataSize(dataSizeProgress);
progress.setHeapSize(heapSizeProgress);
}
void setBatchProgress(int batchProgress) {
@ -197,7 +204,7 @@ public class ScannerContext {
* values
*/
void clearProgress() {
progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0);
}
/**
@ -240,7 +247,8 @@ public class ScannerContext {
* @return true if the size limit can be enforced in the checker's scope
*/
boolean hasSizeLimit(LimitScope checkerScope) {
return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0;
return limits.canEnforceSizeLimitFromScope(checkerScope)
&& (limits.getDataSize() > 0 || limits.getHeapSize() > 0);
}
/**
@ -277,8 +285,8 @@ public class ScannerContext {
return limits.getBatch();
}
long getSizeLimit() {
return limits.getSize();
long getDataSizeLimit() {
return limits.getDataSize();
}
long getTimeLimit() {
@ -298,7 +306,8 @@ public class ScannerContext {
* @return true when the limit is enforceable from the checker's scope and it has been reached
*/
boolean checkSizeLimit(LimitScope checkerScope) {
return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize();
return hasSizeLimit(checkerScope) && (progress.getDataSize() >= limits.getDataSize()
|| progress.getHeapSize() >= limits.getHeapSize());
}
/**
@ -370,8 +379,9 @@ public class ScannerContext {
return this;
}
public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) {
limits.setSize(sizeLimit);
public Builder setSizeLimit(LimitScope sizeScope, long dataSizeLimit, long heapSizeLimit) {
limits.setDataSize(dataSizeLimit);
limits.setHeapSize(heapSizeLimit);
limits.setSizeScope(sizeScope);
return this;
}
@ -515,7 +525,11 @@ public class ScannerContext {
int batch = DEFAULT_BATCH;
LimitScope sizeScope = DEFAULT_SCOPE;
long size = DEFAULT_SIZE;
// The sum of cell data sizes(key + value). The Cell data might be in on heap or off heap area.
long dataSize = DEFAULT_SIZE;
// The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
// such AND data cells of Cells which are in on heap area.
long heapSize = DEFAULT_SIZE;
LimitScope timeScope = DEFAULT_SCOPE;
long time = DEFAULT_TIME;
@ -526,14 +540,15 @@ public class ScannerContext {
LimitFields() {
}
LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
setFields(batch, sizeScope, size, timeScope, time);
LimitFields(int batch, LimitScope sizeScope, long size, long heapSize, LimitScope timeScope,
long time) {
setFields(batch, sizeScope, size, heapSize, timeScope, time);
}
void copy(LimitFields limitsToCopy) {
if (limitsToCopy != null) {
setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(),
limitsToCopy.getTimeScope(), limitsToCopy.getTime());
setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(),
limitsToCopy.getHeapSize(), limitsToCopy.getTimeScope(), limitsToCopy.getTime());
}
}
@ -541,12 +556,14 @@ public class ScannerContext {
* Set all fields together.
* @param batch
* @param sizeScope
* @param size
* @param dataSize
*/
void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize,
LimitScope timeScope, long time) {
setBatch(batch);
setSizeScope(sizeScope);
setSize(size);
setDataSize(dataSize);
setHeapSize(heapSize);
setTimeScope(timeScope);
setTime(time);
}
@ -567,12 +584,20 @@ public class ScannerContext {
return LimitScope.BETWEEN_CELLS.canEnforceLimitFromScope(checkerScope);
}
long getSize() {
return this.size;
long getDataSize() {
return this.dataSize;
}
void setSize(long size) {
this.size = size;
long getHeapSize() {
return this.heapSize;
}
void setDataSize(long dataSize) {
this.dataSize = dataSize;
}
void setHeapSize(long heapSize) {
this.heapSize = heapSize;
}
/**
@ -635,8 +660,11 @@ public class ScannerContext {
sb.append("batch:");
sb.append(batch);
sb.append(", size:");
sb.append(size);
sb.append(", dataSize:");
sb.append(dataSize);
sb.append(", heapSize:");
sb.append(heapSize);
sb.append(", sizeScope:");
sb.append(sizeScope);

View File

@ -603,10 +603,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Update local tracking information
count++;
totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
totalBytesRead += cellSize;
// Update the progress of the scanner context
scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
scannerContext.incrementSizeProgress(cellSize, CellUtil.estimatedHeapSizeOf(cell));
scannerContext.incrementBatchProgress(1);
if (matcher.isUserScan() && totalBytesRead > maxRowSize) {