HBASE-19074 Miscellaneous Observer cleanups

Breaks MemStoreSize into MemStoreSize (read-only) and MemStoreSizing
(read/write). MemStoreSize we allow to Coprocesors. MemStoreSizing we
use internally doing MemStore accounting.
This commit is contained in:
Michael Stack 2017-10-23 20:57:46 -07:00
parent 9716f62f43
commit cb506fd401
23 changed files with 308 additions and 222 deletions

View File

@ -880,7 +880,10 @@ public interface RegionObserver {
* Called before a {@link WALEdit} * Called before a {@link WALEdit}
* replayed for this region. * replayed for this region.
* @param ctx the environment provided by the region server * @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/ */
@Deprecated
default void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, default void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}
@ -888,7 +891,10 @@ public interface RegionObserver {
* Called after a {@link WALEdit} * Called after a {@link WALEdit}
* replayed for this region. * replayed for this region.
* @param ctx the environment provided by the region server * @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/ */
@Deprecated
default void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, default void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}

View File

@ -72,8 +72,11 @@ public interface WALObserver {
* is writen to WAL. * is writen to WAL.
* *
* @return true if default behavior should be bypassed, false otherwise * @return true if default behavior should be bypassed, false otherwise
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
*/ */
// TODO: return value is not used // TODO: return value is not used
@Deprecated
default boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, default boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
return false; return false;
@ -82,7 +85,10 @@ public interface WALObserver {
/** /**
* Called after a {@link WALEdit} * Called after a {@link WALEdit}
* is writen to WAL. * is writen to WAL.
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
*/ */
@Deprecated
default void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, default void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -96,14 +96,14 @@ public abstract class AbstractMemStore implements MemStore {
public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
@Override @Override
public void add(Iterable<Cell> cells, MemStoreSize memstoreSize) { public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
for (Cell cell : cells) { for (Cell cell : cells) {
add(cell, memstoreSize); add(cell, memstoreSizing);
} }
} }
@Override @Override
public void add(Cell cell, MemStoreSize memstoreSize) { public void add(Cell cell, MemStoreSizing memstoreSizing) {
Cell toAdd = maybeCloneWithAllocator(cell); Cell toAdd = maybeCloneWithAllocator(cell);
boolean mslabUsed = (toAdd != cell); boolean mslabUsed = (toAdd != cell);
// This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
@ -118,7 +118,7 @@ public abstract class AbstractMemStore implements MemStore {
if (!mslabUsed) { if (!mslabUsed) {
toAdd = deepCopyIfNeeded(toAdd); toAdd = deepCopyIfNeeded(toAdd);
} }
internalAdd(toAdd, mslabUsed, memstoreSize); internalAdd(toAdd, mslabUsed, memstoreSizing);
} }
private static Cell deepCopyIfNeeded(Cell cell) { private static Cell deepCopyIfNeeded(Cell cell) {
@ -129,9 +129,9 @@ public abstract class AbstractMemStore implements MemStore {
} }
@Override @Override
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize) { public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
for (Cell cell : cells) { for (Cell cell : cells) {
upsert(cell, readpoint, memstoreSize); upsert(cell, readpoint, memstoreSizing);
} }
} }
@ -167,7 +167,11 @@ public abstract class AbstractMemStore implements MemStore {
@Override @Override
public MemStoreSize getSnapshotSize() { public MemStoreSize getSnapshotSize() {
return new MemStoreSize(this.snapshot.keySize(), this.snapshot.heapSize()); return getSnapshotSizing();
}
MemStoreSizing getSnapshotSizing() {
return new MemStoreSizing(this.snapshot.keySize(), this.snapshot.heapSize());
} }
@Override @Override
@ -210,7 +214,7 @@ public abstract class AbstractMemStore implements MemStore {
* @param readpoint readpoint below which we can safely remove duplicate KVs * @param readpoint readpoint below which we can safely remove duplicate KVs
* @param memstoreSize * @param memstoreSize
*/ */
private void upsert(Cell cell, long readpoint, MemStoreSize memstoreSize) { private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
// Add the Cell to the MemStore // Add the Cell to the MemStore
// Use the internalAdd method here since we (a) already have a lock // Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially // and (b) cannot safely use the MSLAB here without potentially
@ -221,7 +225,7 @@ public abstract class AbstractMemStore implements MemStore {
// must do below deep copy. Or else we will keep referring to the bigger chunk of memory and // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
// prevent it from getting GCed. // prevent it from getting GCed.
cell = deepCopyIfNeeded(cell); cell = deepCopyIfNeeded(cell);
this.active.upsert(cell, readpoint, memstoreSize); this.active.upsert(cell, readpoint, memstoreSizing);
setOldestEditTimeToNow(); setOldestEditTimeToNow();
checkActiveSize(); checkActiveSize();
} }
@ -277,8 +281,8 @@ public abstract class AbstractMemStore implements MemStore {
* @param mslabUsed whether using MSLAB * @param mslabUsed whether using MSLAB
* @param memstoreSize * @param memstoreSize
*/ */
private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSize memstoreSize) { private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) {
active.add(toAdd, mslabUsed, memstoreSize); active.add(toAdd, mslabUsed, memstoreSizing);
setOldestEditTimeToNow(); setOldestEditTimeToNow();
checkActiveSize(); checkActiveSize();
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -55,7 +54,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
* of CSLMImmutableSegment * of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSize memstoreSize) { protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing) {
super(segment); // initiailize the upper class super(segment); // initiailize the upper class
incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM); incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
int numOfCells = segment.getCellsCount(); int numOfCells = segment.getCellsCount();
@ -65,7 +64,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
incSize(0, newSegmentSizeDelta); incSize(0, newSegmentSizeDelta);
memstoreSize.incMemStoreSize(0, newSegmentSizeDelta); memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta);
} }
@Override @Override

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.ByteBufferKeyValue; import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -61,7 +60,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
* of CSLMImmutableSegment * of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemStoreSize memstoreSize) { protected CellChunkImmutableSegment(CSLMImmutableSegment segment,
MemStoreSizing memstoreSizing) {
super(segment); // initiailize the upper class super(segment); // initiailize the upper class
incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
int numOfCells = segment.getCellsCount(); int numOfCells = segment.getCellsCount();
@ -73,7 +73,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
incSize(0, newSegmentSizeDelta); incSize(0, newSegmentSizeDelta);
memstoreSize.incMemStoreSize(0, newSegmentSizeDelta); memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta);
} }
@Override @Override

View File

@ -146,12 +146,12 @@ public class CompactingMemStore extends AbstractMemStore {
*/ */
@Override @Override
public MemStoreSize size() { public MemStoreSize size() {
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSizing memstoreSizing = new MemStoreSizing();
memstoreSize.incMemStoreSize(this.active.keySize(), this.active.heapSize()); memstoreSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize());
for (Segment item : pipeline.getSegments()) { for (Segment item : pipeline.getSegments()) {
memstoreSize.incMemStoreSize(item.keySize(), item.heapSize()); memstoreSizing.incMemStoreSize(item.keySize(), item.heapSize());
} }
return memstoreSize; return memstoreSizing;
} }
/** /**
@ -215,17 +215,17 @@ public class CompactingMemStore extends AbstractMemStore {
*/ */
@Override @Override
public MemStoreSize getFlushableSize() { public MemStoreSize getFlushableSize() {
MemStoreSize snapshotSize = getSnapshotSize(); MemStoreSizing snapshotSizing = getSnapshotSizing();
if (snapshotSize.getDataSize() == 0) { if (snapshotSizing.getDataSize() == 0) {
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
if (compositeSnapshot) { if (compositeSnapshot) {
snapshotSize = pipeline.getPipelineSize(); snapshotSizing = pipeline.getPipelineSizing();
snapshotSize.incMemStoreSize(this.active.keySize(), this.active.heapSize()); snapshotSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize());
} else { } else {
snapshotSize = pipeline.getTailSize(); snapshotSizing = pipeline.getTailSizing();
} }
} }
return snapshotSize.getDataSize() > 0 ? snapshotSize return snapshotSizing.getDataSize() > 0 ? snapshotSizing
: new MemStoreSize(this.active.keySize(), this.active.heapSize()); : new MemStoreSize(this.active.keySize(), this.active.heapSize());
} }

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -149,7 +149,7 @@ public class CompactionPipeline {
long newHeapSize = 0; long newHeapSize = 0;
if(segment != null) newHeapSize = segment.heapSize(); if(segment != null) newHeapSize = segment.heapSize();
long heapSizeDelta = suffixHeapSize - newHeapSize; long heapSizeDelta = suffixHeapSize - newHeapSize;
region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta)); region.addMemStoreSize(new MemStoreSizing(-dataSizeDelta, -heapSizeDelta));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: " LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: "
+ newDataSize + ". Suffix heap size: " + suffixHeapSize + newDataSize + ". Suffix heap size: " + suffixHeapSize
@ -199,14 +199,14 @@ public class CompactionPipeline {
int i = 0; int i = 0;
for (ImmutableSegment s : pipeline) { for (ImmutableSegment s : pipeline) {
if ( s.canBeFlattened() ) { if ( s.canBeFlattened() ) {
MemStoreSize newMemstoreSize = new MemStoreSize(); // the size to be updated MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreSize); (CSLMImmutableSegment)s,idxType,newMemstoreAccounting);
replaceAtIndex(i,newS); replaceAtIndex(i,newS);
if(region != null) { if(region != null) {
// update the global memstore size counter // update the global memstore size counter
// upon flattening there is no change in the data size // upon flattening there is no change in the data size
region.addMemStoreSize(new MemStoreSize(0, newMemstoreSize.getHeapSize())); region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize()));
} }
LOG.debug("Compaction pipeline segment " + s + " was flattened"); LOG.debug("Compaction pipeline segment " + s + " was flattened");
return true; return true;
@ -241,22 +241,22 @@ public class CompactionPipeline {
return minSequenceId; return minSequenceId;
} }
public MemStoreSize getTailSize() { public MemStoreSizing getTailSizing() {
LinkedList<? extends Segment> localCopy = readOnlyCopy; LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (localCopy.isEmpty()) return new MemStoreSize(true); if (localCopy.isEmpty()) return new MemStoreSizing();
return new MemStoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize()); return new MemStoreSizing(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize());
} }
public MemStoreSize getPipelineSize() { public MemStoreSizing getPipelineSizing() {
long keySize = 0; long keySize = 0;
long heapSize = 0; long heapSize = 0;
LinkedList<? extends Segment> localCopy = readOnlyCopy; LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (localCopy.isEmpty()) return new MemStoreSize(true); if (localCopy.isEmpty()) return new MemStoreSizing();
for (Segment segment : localCopy) { for (Segment segment : localCopy) {
keySize += segment.keySize(); keySize += segment.keySize();
heapSize += segment.heapSize(); heapSize += segment.heapSize();
} }
return new MemStoreSize(keySize, heapSize); return new MemStoreSizing(keySize, heapSize);
} }
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,

View File

@ -257,13 +257,13 @@ public class CompositeImmutableSegment extends ImmutableSegment {
} }
@Override @Override
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) { protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner"); throw new IllegalStateException("Not supported by CompositeImmutableScanner");
} }
@Override @Override
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemStoreSize memstoreSize) { MemStoreSizing memstoreSizing) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner"); throw new IllegalStateException("Not supported by CompositeImmutableScanner");
} }

View File

@ -195,26 +195,26 @@ public class DefaultMemStore extends AbstractMemStore {
byte [] fam = Bytes.toBytes("col"); byte [] fam = Bytes.toBytes("col");
byte [] qf = Bytes.toBytes("umn"); byte [] qf = Bytes.toBytes("umn");
byte [] empty = new byte[0]; byte [] empty = new byte[0];
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSizing memstoreSizing = new MemStoreSizing();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
// Give each its own ts // Give each its own ts
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize); memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
} }
LOG.info("memstore1 estimated size=" LOG.info("memstore1 estimated size="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize); memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
} }
LOG.info("memstore1 estimated size (2nd loading of same data)=" LOG.info("memstore1 estimated size (2nd loading of same data)="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
// Make a variably sized memstore. // Make a variably sized memstore.
DefaultMemStore memstore2 = new DefaultMemStore(); DefaultMemStore memstore2 = new DefaultMemStore();
memstoreSize = new MemStoreSize(); memstoreSizing = new MemStoreSizing();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSize); memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSizing);
} }
LOG.info("memstore2 estimated size=" LOG.info("memstore2 estimated size="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
final int seconds = 30; final int seconds = 30;
LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
LOG.info("Exiting."); LOG.info("Exiting.");

View File

@ -537,11 +537,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final long startTime; final long startTime;
final long flushOpSeqId; final long flushOpSeqId;
final long flushedSeqId; final long flushedSeqId;
final MemStoreSize totalFlushableSize; final MemStoreSizing totalFlushableSize;
/** Constructs an early exit case */ /** Constructs an early exit case */
PrepareFlushResult(FlushResultImpl result, long flushSeqId) { PrepareFlushResult(FlushResultImpl result, long flushSeqId) {
this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemStoreSize()); this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, MemStoreSizing.DUD);
} }
/** Constructs a successful prepare flush result */ /** Constructs a successful prepare flush result */
@ -549,7 +549,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
TreeMap<byte[], StoreFlushContext> storeFlushCtxs, TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
TreeMap<byte[], List<Path>> committedFiles, TreeMap<byte[], List<Path>> committedFiles,
TreeMap<byte[], MemStoreSize> storeFlushableSize, long startTime, long flushSeqId, TreeMap<byte[], MemStoreSize> storeFlushableSize, long startTime, long flushSeqId,
long flushedSeqId, MemStoreSize totalFlushableSize) { long flushedSeqId, MemStoreSizing totalFlushableSize) {
this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime, this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
flushSeqId, flushedSeqId, totalFlushableSize); flushSeqId, flushedSeqId, totalFlushableSize);
} }
@ -559,7 +559,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
TreeMap<byte[], StoreFlushContext> storeFlushCtxs, TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
TreeMap<byte[], List<Path>> committedFiles, TreeMap<byte[], List<Path>> committedFiles,
TreeMap<byte[], MemStoreSize> storeFlushableSize, long startTime, long flushSeqId, TreeMap<byte[], MemStoreSize> storeFlushableSize, long startTime, long flushSeqId,
long flushedSeqId, MemStoreSize totalFlushableSize) { long flushedSeqId, MemStoreSizing totalFlushableSize) {
this.result = result; this.result = result;
this.storeFlushCtxs = storeFlushCtxs; this.storeFlushCtxs = storeFlushCtxs;
this.committedFiles = committedFiles; this.committedFiles = committedFiles;
@ -1711,7 +1711,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.closed.set(true); this.closed.set(true);
if (!canFlush) { if (!canFlush) {
this.decrMemStoreSize(new MemStoreSize(memstoreDataSize.get(), getMemStoreHeapSize())); this.decrMemStoreSize(new MemStoreSizing(memstoreDataSize.get(), getMemStoreHeapSize()));
} else if (memstoreDataSize.get() != 0) { } else if (memstoreDataSize.get() != 0) {
LOG.error("Memstore size is " + memstoreDataSize.get()); LOG.error("Memstore size is " + memstoreDataSize.get());
} }
@ -2502,7 +2502,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// block waiting for the lock for internal flush // block waiting for the lock for internal flush
this.updatesLock.writeLock().lock(); this.updatesLock.writeLock().lock();
status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
MemStoreSize totalSizeOfFlushableStores = new MemStoreSize(); MemStoreSizing totalSizeOfFlushableStores = new MemStoreSizing();
Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>(); Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
for (HStore store : storesToFlush) { for (HStore store : storesToFlush) {
@ -2546,7 +2546,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (HStore s : storesToFlush) { for (HStore s : storesToFlush) {
MemStoreSize flushableSize = s.getFlushableSize(); MemStoreSize flushableSize = s.getFlushableSize();
totalSizeOfFlushableStores.incMemStoreSize(flushableSize); totalSizeOfFlushableStores.incMemStoreSize(flushableSize);
storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), s.createFlushContext(flushOpSeqId)); storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(),
s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // for writing stores to WAL committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // for writing stores to WAL
storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize); storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize);
} }
@ -3323,7 +3324,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int cellCount = 0; int cellCount = 0;
/** Keep track of the locks we hold so we can release them in finally clause */ /** Keep track of the locks we hold so we can release them in finally clause */
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
MemStoreSize memStoreSize = new MemStoreSize(); MemStoreSizing memStoreAccounting = new MemStoreSizing();
try { try {
// STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
int numReadyToWrite = 0; int numReadyToWrite = 0;
@ -3506,11 +3507,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.updateSequenceId(batchOp.familyCellMaps[i].values(), this.updateSequenceId(batchOp.familyCellMaps[i].values(),
replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
} }
applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreSize); applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreAccounting);
} }
// update memstore size // update memstore size
this.addAndGetMemStoreSize(memStoreSize); this.addAndGetMemStoreSize(memStoreAccounting);
// calling the post CP hook for batch mutation // calling the post CP hook for batch mutation
if (!replay && coprocessorHost != null) { if (!replay && coprocessorHost != null) {
@ -3983,12 +3984,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param memstoreSize * @param memstoreSize
*/ */
private void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap, private void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
MemStoreSize memstoreSize) throws IOException { MemStoreSizing memstoreAccounting) throws IOException {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey(); byte[] family = e.getKey();
List<Cell> cells = e.getValue(); List<Cell> cells = e.getValue();
assert cells instanceof RandomAccess; assert cells instanceof RandomAccess;
applyToMemStore(getStore(family), cells, false, memstoreSize); applyToMemStore(getStore(family), cells, false, memstoreAccounting);
} }
} }
@ -3996,30 +3997,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
* set; when set we will run operations that make sense in the increment/append scenario * set; when set we will run operations that make sense in the increment/append scenario
* but that do not make sense otherwise. * but that do not make sense otherwise.
* @see #applyToMemStore(HStore, Cell, MemStoreSize) * @see #applyToMemStore(HStore, Cell, MemStoreSizing)
*/ */
private void applyToMemStore(HStore store, List<Cell> cells, boolean delta, private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
MemStoreSize memstoreSize) throws IOException { MemStoreSizing memstoreAccounting) throws IOException {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!! // Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1; boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) { if (upsert) {
store.upsert(cells, getSmallestReadPoint(), memstoreSize); store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
} else { } else {
store.add(cells, memstoreSize); store.add(cells, memstoreAccounting);
} }
} }
/** /**
* @see #applyToMemStore(HStore, List, boolean, MemStoreSize) * @see #applyToMemStore(HStore, List, boolean, MemStoreSizing)
*/ */
private void applyToMemStore(HStore store, Cell cell, MemStoreSize memstoreSize) private void applyToMemStore(HStore store, Cell cell, MemStoreSizing memstoreAccounting)
throws IOException { throws IOException {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!! // Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
if (store == null) { if (store == null) {
checkFamily(CellUtil.cloneFamily(cell)); checkFamily(CellUtil.cloneFamily(cell));
// Unreachable because checkFamily will throw exception // Unreachable because checkFamily will throw exception
} }
store.add(cell, memstoreSize); store.add(cell, memstoreAccounting);
} }
/** /**
@ -4347,7 +4348,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
boolean flush = false; boolean flush = false;
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSizing memstoreSize = new MemStoreSizing();
for (Cell cell: val.getCells()) { for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the special // Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries // METACOLUMN info such as HBASE::CACHEFLUSH entries
@ -4843,7 +4844,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException * @throws IOException
*/ */
private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException { private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException {
MemStoreSize totalFreedSize = new MemStoreSize(); MemStoreSizing totalFreedSize = new MemStoreSizing();
this.updatesLock.writeLock().lock(); this.updatesLock.writeLock().lock();
try { try {
@ -5271,15 +5272,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Used by tests * Used by tests
* @param s Store to add edit too. * @param s Store to add edit too.
* @param cell Cell to add. * @param cell Cell to add.
* @param memstoreSize
*/ */
@VisibleForTesting @VisibleForTesting
protected void restoreEdit(HStore s, Cell cell, MemStoreSize memstoreSize) { protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreAccounting) {
s.add(cell, memstoreSize); s.add(cell, memstoreAccounting);
} }
/** /**
* @param fs
* @param p File to check. * @param p File to check.
* @return True if file was zero-length (and if so, we'll delete it in here). * @return True if file was zero-length (and if so, we'll delete it in here).
* @throws IOException * @throws IOException
@ -7114,7 +7113,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This is assigned by mvcc either explicity in the below or in the guts of the WAL append // This is assigned by mvcc either explicity in the below or in the guts of the WAL append
// when it assigns the edit a sequencedid (A.K.A the mvcc write number). // when it assigns the edit a sequencedid (A.K.A the mvcc write number).
WriteEntry writeEntry = null; WriteEntry writeEntry = null;
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSizing memstoreAccounting = new MemStoreSizing();
try { try {
boolean success = false; boolean success = false;
try { try {
@ -7158,7 +7157,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If no WAL, need to stamp it here. // If no WAL, need to stamp it here.
CellUtil.setSequenceId(cell, sequenceId); CellUtil.setSequenceId(cell, sequenceId);
} }
applyToMemStore(getStore(cell), cell, memstoreSize); applyToMemStore(getStore(cell), cell, memstoreAccounting);
} }
} }
@ -7194,7 +7193,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
if (!mutations.isEmpty()) { if (!mutations.isEmpty()) {
long newSize = this.addAndGetMemStoreSize(memstoreSize); long newSize = this.addAndGetMemStoreSize(memstoreAccounting);
requestFlushIfNeeded(newSize); requestFlushIfNeeded(newSize);
} }
} }
@ -7298,7 +7297,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(op); startRegionOperation(op);
List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null; List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null;
RowLock rowLock = null; RowLock rowLock = null;
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSizing memstoreAccounting = new MemStoreSizing();
try { try {
rowLock = getRowLockInternal(mutation.getRow(), false); rowLock = getRowLockInternal(mutation.getRow(), false);
lock(this.updatesLock.readLock()); lock(this.updatesLock.readLock());
@ -7324,7 +7323,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
// Now write to MemStore. Do it a column family at a time. // Now write to MemStore. Do it a column family at a time.
for (Map.Entry<HStore, List<Cell>> e : forMemStore.entrySet()) { for (Map.Entry<HStore, List<Cell>> e : forMemStore.entrySet()) {
applyToMemStore(e.getKey(), e.getValue(), true, memstoreSize); applyToMemStore(e.getKey(), e.getValue(), true, memstoreAccounting);
} }
mvcc.completeAndWait(writeEntry); mvcc.completeAndWait(writeEntry);
if (rsServices != null && rsServices.getNonceManager() != null) { if (rsServices != null && rsServices.getNonceManager() != null) {
@ -7347,7 +7346,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock.release(); rowLock.release();
} }
// Request a cache flush if over the limit. Do it outside update lock. // Request a cache flush if over the limit. Do it outside update lock.
if (isFlushSize(addAndGetMemStoreSize(memstoreSize))) { if (isFlushSize(addAndGetMemStoreSize(memstoreAccounting))) {
requestFlush(); requestFlush();
} }
closeRegionOperation(op); closeRegionOperation(op);

View File

@ -682,13 +682,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
/** /**
* Adds a value to the memstore * Adds a value to the memstore
* @param cell
* @param memstoreSize
*/ */
public void add(final Cell cell, MemStoreSize memstoreSize) { public void add(final Cell cell, MemStoreSizing memstoreSizing) {
lock.readLock().lock(); lock.readLock().lock();
try { try {
this.memstore.add(cell, memstoreSize); this.memstore.add(cell, memstoreSizing);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
@ -696,13 +694,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
/** /**
* Adds the specified value to the memstore * Adds the specified value to the memstore
* @param cells
* @param memstoreSize
*/ */
public void add(final Iterable<Cell> cells, MemStoreSize memstoreSize) { public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
lock.readLock().lock(); lock.readLock().lock();
try { try {
memstore.add(cells, memstoreSize); memstore.add(cells, memstoreSizing);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
@ -2143,16 +2139,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* <p> * <p>
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
* across all of them. * across all of them.
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs * @param readpoint readpoint below which we can safely remove duplicate KVs
* @param memstoreSize
* @throws IOException * @throws IOException
*/ */
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize) public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
throws IOException { throws IOException {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
this.memstore.upsert(cells, readpoint, memstoreSize); this.memstore.upsert(cells, readpoint, memstoreSizing);
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }

View File

@ -68,18 +68,18 @@ public interface MemStore {
/** /**
* Write an update * Write an update
* @param cell * @param cell
* @param memstoreSize The delta in memstore size will be passed back via this. * @param memstoreSizing The delta in memstore size will be passed back via this.
* This will include both data size and heap overhead delta. * This will include both data size and heap overhead delta.
*/ */
void add(final Cell cell, MemStoreSize memstoreSize); void add(final Cell cell, MemStoreSizing memstoreSizing);
/** /**
* Write the updates * Write the updates
* @param cells * @param cells
* @param memstoreSize The delta in memstore size will be passed back via this. * @param memstoreSizing The delta in memstore size will be passed back via this.
* This will include both data size and heap overhead delta. * This will include both data size and heap overhead delta.
*/ */
void add(Iterable<Cell> cells, MemStoreSize memstoreSize); void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing);
/** /**
* @return Oldest timestamp of all the Cells in the MemStore * @return Oldest timestamp of all the Cells in the MemStore
@ -99,10 +99,10 @@ public interface MemStore {
* only see each KeyValue update as atomic. * only see each KeyValue update as atomic.
* @param cells * @param cells
* @param readpoint readpoint below which we can safely remove duplicate Cells. * @param readpoint readpoint below which we can safely remove duplicate Cells.
* @param memstoreSize The delta in memstore size will be passed back via this. * @param memstoreSizing The delta in memstore size will be passed back via this.
* This will include both data size and heap overhead delta. * This will include both data size and heap overhead delta.
*/ */
void upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize); void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing);
/** /**
* @return scanner over the memstore. This might include scanner over the snapshot when one is * @return scanner over the memstore. This might include scanner over the snapshot when one is

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -17,71 +17,47 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Wraps the data size part and total heap space occupied by the memstore. * Reports the data size part and total heap space occupied by the MemStore.
* Read-only.
* @see MemStoreSizing
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public class MemStoreSize { public class MemStoreSize {
/**
*'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can
* be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap
* or off heap LABs
*/
protected long dataSize;
// 'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can /** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
// be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap * When Cells in on heap area, this will include the cells data size as well.
// or off heap LABs */
private long dataSize; protected long heapSize;
// 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
// When Cells in on heap area, this will include the cells data size as well.
private long heapSize;
final private boolean isEmpty;
public MemStoreSize() { public MemStoreSize() {
dataSize = 0; this(0L, 0L);
heapSize = 0;
isEmpty = false;
}
public MemStoreSize(boolean isEmpty) {
dataSize = 0;
heapSize = 0;
this.isEmpty = isEmpty;
}
public boolean isEmpty() {
return isEmpty;
} }
public MemStoreSize(long dataSize, long heapSize) { public MemStoreSize(long dataSize, long heapSize) {
this.dataSize = dataSize; this.dataSize = dataSize;
this.heapSize = heapSize; this.heapSize = heapSize;
this.isEmpty = false;
} }
public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) { public boolean isEmpty() {
this.dataSize += dataSizeDelta; return this.dataSize == 0 && this.heapSize == 0;
this.heapSize += heapSizeDelta;
}
public void incMemStoreSize(MemStoreSize delta) {
this.dataSize += delta.dataSize;
this.heapSize += delta.heapSize;
}
public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
this.dataSize -= dataSizeDelta;
this.heapSize -= heapSizeDelta;
}
public void decMemStoreSize(MemStoreSize delta) {
this.dataSize -= delta.dataSize;
this.heapSize -= delta.heapSize;
} }
public long getDataSize() { public long getDataSize() {
return isEmpty ? 0 : dataSize; return this.dataSize;
} }
public long getHeapSize() { public long getHeapSize() {
return isEmpty ? 0 : heapSize; return this.heapSize;
} }
@Override @Override

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Accounting of current heap and data sizes.
* Allows read/write on data/heap size as opposed to {@Link MemStoreSize} which is read-only.
* For internal use.
* @see MemStoreSize
*/
@InterfaceAudience.Private
public class MemStoreSizing extends MemStoreSize {
public static final MemStoreSizing DUD = new MemStoreSizing() {
@Override
public void incMemStoreSize(MemStoreSize delta) {
incMemStoreSize(delta.getDataSize(), delta.getHeapSize());
}
@Override
public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
throw new RuntimeException("I'm a dud, you can't use me!");
}
@Override
public void decMemStoreSize(MemStoreSize delta) {
decMemStoreSize(delta.getDataSize(), delta.getHeapSize());
}
@Override
public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
throw new RuntimeException("I'm a dud, you can't use me!");
}
};
public MemStoreSizing() {
super();
}
public MemStoreSizing(long dataSize, long heapSize) {
super(dataSize, heapSize);
}
public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
this.dataSize += dataSizeDelta;
this.heapSize += heapSizeDelta;
}
public void incMemStoreSize(MemStoreSize delta) {
incMemStoreSize(delta.getDataSize(), delta.getHeapSize());
}
public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
this.dataSize -= dataSizeDelta;
this.heapSize -= heapSizeDelta;
}
public void decMemStoreSize(MemStoreSize delta) {
decMemStoreSize(delta.getDataSize(), delta.getHeapSize());
}
public void empty() {
this.dataSize = 0L;
this.heapSize = 0L;
}
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof MemStoreSizing)) {
return false;
}
MemStoreSizing other = (MemStoreSizing) obj;
return this.dataSize == other.dataSize && this.heapSize == other.heapSize;
}
@Override
public int hashCode() {
long h = 13 * this.dataSize;
h = h + 14 * this.heapSize;
return (int) h;
}
@Override
public String toString() {
return "dataSize=" + this.dataSize + " , heapSize=" + this.heapSize;
}
}

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
* org.apache.hadoop.hbase.coprocessor.ObserverContext, MiniBatchOperationInProgress) * org.apache.hadoop.hbase.coprocessor.ObserverContext, MiniBatchOperationInProgress)
* @param T Pair&lt;Mutation, Integer&gt; pair of Mutations and associated rowlock ids . * @param T Pair&lt;Mutation, Integer&gt; pair of Mutations and associated rowlock ids .
*/ */
@InterfaceAudience.LimitedPrivate("Coprocessors") @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public class MiniBatchOperationInProgress<T> { public class MiniBatchOperationInProgress<T> {
private final T[] operations; private final T[] operations;
private Mutation[][] operationsFromCoprocessors; private Mutation[][] operationsFromCoprocessors;

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -51,14 +50,13 @@ public class MutableSegment extends Segment {
* Adds the given cell into the segment * Adds the given cell into the segment
* @param cell the cell to add * @param cell the cell to add
* @param mslabUsed whether using MSLAB * @param mslabUsed whether using MSLAB
* @param memstoreSize
*/ */
public void add(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) { public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing) {
internalAdd(cell, mslabUsed, memstoreSize); internalAdd(cell, mslabUsed, memStoreSizing);
} }
public void upsert(Cell cell, long readpoint, MemStoreSize memstoreSize) { public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing) {
internalAdd(cell, false, memstoreSize); internalAdd(cell, false, memStoreSizing);
// Get the Cells for the row/family/qualifier regardless of timestamp. // Get the Cells for the row/family/qualifier regardless of timestamp.
// For this case we want to clean up any other puts // For this case we want to clean up any other puts
@ -90,8 +88,8 @@ public class MutableSegment extends Segment {
int cellLen = getCellLength(cur); int cellLen = getCellLength(cur);
long heapSize = heapSizeChange(cur, true); long heapSize = heapSizeChange(cur, true);
this.incSize(-cellLen, -heapSize); this.incSize(-cellLen, -heapSize);
if (memstoreSize != null) { if (memStoreSizing != null) {
memstoreSize.decMemStoreSize(cellLen, heapSize); memStoreSizing.decMemStoreSize(cellLen, heapSize);
} }
it.remove(); it.remove();
} else { } else {

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.Pair;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RegionServerAccounting { public class RegionServerAccounting {
// memstore data size // memstore data size
private final LongAdder globalMemstoreDataSize = new LongAdder(); private final LongAdder globalMemstoreDataSize = new LongAdder();
// memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell // memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell
@ -46,7 +45,7 @@ public class RegionServerAccounting {
// Store the edits size during replaying WAL. Use this to roll back the // Store the edits size during replaying WAL. Use this to roll back the
// global memstore size once a region opening failed. // global memstore size once a region opening failed.
private final ConcurrentMap<byte[], MemStoreSize> replayEditsPerRegion = private final ConcurrentMap<byte[], MemStoreSizing> replayEditsPerRegion =
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
private long globalMemStoreLimit; private long globalMemStoreLimit;
@ -216,14 +215,14 @@ public class RegionServerAccounting {
* @param memStoreSize the Memstore size will be added to replayEditsPerRegion. * @param memStoreSize the Memstore size will be added to replayEditsPerRegion.
*/ */
public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) { public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) {
MemStoreSize replayEdistsSize = replayEditsPerRegion.get(regionName); MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName);
// All ops on the same MemStoreSize object is going to be done by single thread, sequentially // All ops on the same MemStoreSize object is going to be done by single thread, sequentially
// only. First calls to this method to increment the per region reply edits size and then call // only. First calls to this method to increment the per region reply edits size and then call
// to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of
// the region open operation. No need to handle multi thread issues on one region's entry in // the region open operation. No need to handle multi thread issues on one region's entry in
// this Map. // this Map.
if (replayEdistsSize == null) { if (replayEdistsSize == null) {
replayEdistsSize = new MemStoreSize(); replayEdistsSize = new MemStoreSizing();
replayEditsPerRegion.put(regionName, replayEdistsSize); replayEditsPerRegion.put(regionName, replayEdistsSize);
} }
replayEdistsSize.incMemStoreSize(memStoreSize); replayEdistsSize.incMemStoreSize(memStoreSize);

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -277,13 +277,13 @@ public abstract class Segment {
return comparator; return comparator;
} }
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) { protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
boolean succ = getCellSet().add(cell); boolean succ = getCellSet().add(cell);
updateMetaInfo(cell, succ, mslabUsed, memstoreSize); updateMetaInfo(cell, succ, mslabUsed, memstoreSizing);
} }
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemStoreSize memstoreSize) { MemStoreSizing memstoreSizing) {
long cellSize = 0; long cellSize = 0;
// If there's already a same cell in the CellSet and we are using MSLAB, we must count in the // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
// MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
@ -293,8 +293,8 @@ public abstract class Segment {
} }
long heapSize = heapSizeChange(cellToAdd, succ); long heapSize = heapSizeChange(cellToAdd, succ);
incSize(cellSize, heapSize); incSize(cellSize, heapSize);
if (memstoreSize != null) { if (memstoreSizing != null) {
memstoreSize.incMemStoreSize(cellSize, heapSize); memstoreSizing.incMemStoreSize(cellSize, heapSize);
} }
getTimeRangeTracker().includeTimestamp(cellToAdd); getTimeRangeTracker().includeTimestamp(cellToAdd);
minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId()); minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
@ -307,8 +307,8 @@ public abstract class Segment {
} }
} }
protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSize memstoreSize) { protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSizing memstoreSizing) {
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSize); updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing);
} }
/** /**

View File

@ -95,17 +95,18 @@ public final class SegmentFactory {
// create flat immutable segment from non-flat immutable segment // create flat immutable segment from non-flat immutable segment
// for flattening // for flattening
public ImmutableSegment createImmutableSegmentByFlattening( public ImmutableSegment createImmutableSegmentByFlattening(
CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, MemStoreSize memstoreSize) { CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType,
MemStoreSizing memstoreSizing) {
ImmutableSegment res = null; ImmutableSegment res = null;
switch (idxType) { switch (idxType) {
case CHUNK_MAP: case CHUNK_MAP:
res = new CellChunkImmutableSegment(segment, memstoreSize); res = new CellChunkImmutableSegment(segment, memstoreSizing);
break; break;
case CSLM_MAP: case CSLM_MAP:
assert false; // non-flat segment can not be the result of flattening assert false; // non-flat segment can not be the result of flattening
break; break;
case ARRAY_MAP: case ARRAY_MAP:
res = new CellArrayImmutableSegment(segment, memstoreSize); res = new CellArrayImmutableSegment(segment, memstoreSizing);
break; break;
} }
return res; return res;

View File

@ -607,18 +607,18 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) { private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier"); byte[] qf = Bytes.toBytes("testqualifier");
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSizing memstoreSizing = new MemStoreSizing();
for (int i = 0; i < keys.length; i++) { for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts Threads.sleep(1); // to make sure each kv gets a different ts
byte[] row = Bytes.toBytes(keys[i]); byte[] row = Bytes.toBytes(keys[i]);
byte[] val = Bytes.toBytes(keys[i] + i); byte[] val = Bytes.toBytes(keys[i] + i);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val); KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
hmc.add(kv, memstoreSize); hmc.add(kv, memstoreSizing);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
} }
regionServicesForStores.addMemStoreSize(memstoreSize); regionServicesForStores.addMemStoreSize(memstoreSizing);
return memstoreSize.getDataSize(); return memstoreSizing.getDataSize();
} }
private long cellBeforeFlushSize() { private long cellBeforeFlushSize() {

View File

@ -126,9 +126,9 @@ public class TestDefaultMemStore {
public void testPutSameCell() { public void testPutSameCell() {
byte[] bytes = Bytes.toBytes(getName()); byte[] bytes = Bytes.toBytes(getName());
KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes); KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
MemStoreSize sizeChangeForFirstCell = new MemStoreSize(); MemStoreSizing sizeChangeForFirstCell = new MemStoreSizing();
this.memstore.add(kv, sizeChangeForFirstCell); this.memstore.add(kv, sizeChangeForFirstCell);
MemStoreSize sizeChangeForSecondCell = new MemStoreSize(); MemStoreSizing sizeChangeForSecondCell = new MemStoreSizing();
this.memstore.add(kv, sizeChangeForSecondCell); this.memstore.add(kv, sizeChangeForSecondCell);
// make sure memstore size increase won't double-count MSLAB chunk size // make sure memstore size increase won't double-count MSLAB chunk size
assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getDataSize()); assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getDataSize());

View File

@ -230,7 +230,7 @@ public class TestHStore {
* @throws Exception * @throws Exception
*/ */
@Test @Test
public void testFlushSizeAccounting() throws Exception { public void testFlushSizeSizing() throws Exception {
LOG.info("Setting up a faulty file system that cannot write in " + LOG.info("Setting up a faulty file system that cannot write in " +
this.name.getMethodName()); this.name.getMethodName());
final Configuration conf = HBaseConfiguration.create(); final Configuration conf = HBaseConfiguration.create();
@ -254,7 +254,7 @@ public class TestHStore {
MemStoreSize size = store.memstore.getFlushableSize(); MemStoreSize size = store.memstore.getFlushableSize();
assertEquals(0, size.getDataSize()); assertEquals(0, size.getDataSize());
LOG.info("Adding some data"); LOG.info("Adding some data");
MemStoreSize kvSize = new MemStoreSize(); MemStoreSizing kvSize = new MemStoreSizing();
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize); store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
// add the heap size of active (mutable) segment // add the heap size of active (mutable) segment
kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD); kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD);
@ -273,7 +273,7 @@ public class TestHStore {
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD); CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD);
size = store.memstore.getFlushableSize(); size = store.memstore.getFlushableSize();
assertEquals(kvSize, size); assertEquals(kvSize, size);
MemStoreSize kvSize2 = new MemStoreSize(); MemStoreSizing kvSize2 = new MemStoreSizing();
store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2); store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD); kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD);
// Even though we add a new kv, we expect the flushable size to be 'same' since we have // Even though we add a new kv, we expect the flushable size to be 'same' since we have
@ -1217,7 +1217,7 @@ public class TestHStore {
byte[] value0 = Bytes.toBytes("value0"); byte[] value0 = Bytes.toBytes("value0");
byte[] value1 = Bytes.toBytes("value1"); byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2"); byte[] value2 = Bytes.toBytes("value2");
MemStoreSize memStoreSize = new MemStoreSize(); MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime(); long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100; long seqId = 100;
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
@ -1229,18 +1229,18 @@ public class TestHStore {
} }
}); });
// The cells having the value0 won't be flushed to disk because the value of max version is 1 // The cells having the value0 won't be flushed to disk because the value of max version is 1
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize); store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize); store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize); store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize); store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize); store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize); store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize); store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize); store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize); store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize); store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize); store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize); store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
List<Cell> myList = new MyList<>(hook); List<Cell> myList = new MyList<>(hook);
Scan scan = new Scan() Scan scan = new Scan()
.withStartRow(r1) .withStartRow(r1)
@ -1276,13 +1276,13 @@ public class TestHStore {
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("value"); byte[] value = Bytes.toBytes("value");
MemStoreSize memStoreSize = new MemStoreSize(); MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime(); long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100; long seqId = 100;
// older data whihc shouldn't be "seen" by client // older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSize); store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
store.add(createCell(qf2, ts, seqId, value), memStoreSize); store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
store.add(createCell(qf3, ts, seqId, value), memStoreSize); store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1); quals.add(qf1);
quals.add(qf2); quals.add(qf2);
@ -1354,22 +1354,22 @@ public class TestHStore {
}); });
byte[] oldValue = Bytes.toBytes("oldValue"); byte[] oldValue = Bytes.toBytes("oldValue");
byte[] currentValue = Bytes.toBytes("currentValue"); byte[] currentValue = Bytes.toBytes("currentValue");
MemStoreSize memStoreSize = new MemStoreSize(); MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime(); long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100; long seqId = 100;
// older data whihc shouldn't be "seen" by client // older data whihc shouldn't be "seen" by client
myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize); myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize); myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize); myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
long snapshotId = id++; long snapshotId = id++;
// push older data into snapshot -- phase (1/4) // push older data into snapshot -- phase (1/4)
StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId); StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
// insert current data into active -- phase (2/4) // insert current data into active -- phase (2/4)
myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize); myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize); myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize); myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1); quals.add(qf1);
quals.add(qf2); quals.add(qf2);
@ -1467,21 +1467,21 @@ public class TestHStore {
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("thisisavarylargevalue"); byte[] value = Bytes.toBytes("thisisavarylargevalue");
MemStoreSize memStoreSize = new MemStoreSize(); MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime(); long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100; long seqId = 100;
// older data whihc shouldn't be "seen" by client // older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSize); store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
store.add(createCell(qf2, ts, seqId, value), memStoreSize); store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
store.add(createCell(qf3, ts, seqId, value), memStoreSize); store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
StoreFlushContext storeFlushCtx = store.createFlushContext(id++); StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
// This shouldn't invoke another in-memory flush because the first compactor thread // This shouldn't invoke another in-memory flush because the first compactor thread
// hasn't accomplished the in-memory compaction. // hasn't accomplished the in-memory compaction.
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize); store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
//okay. Let the compaction be completed //okay. Let the compaction be completed
MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown(); MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
@ -1490,9 +1490,9 @@ public class TestHStore {
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
} }
// This should invoke another in-memory flush. // This should invoke another in-memory flush.
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize); store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE)); String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
@ -1589,25 +1589,25 @@ public class TestHStore {
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
// Set the lower threshold to invoke the "MERGE" policy // Set the lower threshold to invoke the "MERGE" policy
MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {}); MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
MemStoreSize memStoreSize = new MemStoreSize(); MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
long seqID = 1l; long seqID = 1l;
// Add some data to the region and do some flushes // Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) { for (int i = 1; i < 10; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize); memStoreSizing);
} }
// flush them // flush them
flushStore(store, seqID); flushStore(store, seqID);
for (int i = 11; i < 20; i++) { for (int i = 11; i < 20; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize); memStoreSizing);
} }
// flush them // flush them
flushStore(store, seqID); flushStore(store, seqID);
for (int i = 21; i < 30; i++) { for (int i = 21; i < 30; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize); memStoreSizing);
} }
// flush them // flush them
flushStore(store, seqID); flushStore(store, seqID);
@ -1624,14 +1624,14 @@ public class TestHStore {
// create more store files // create more store files
for (int i = 31; i < 40; i++) { for (int i = 31; i < 40; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize); memStoreSizing);
} }
// flush them // flush them
flushStore(store, seqID); flushStore(store, seqID);
for (int i = 41; i < 50; i++) { for (int i = 41; i < 50; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize); memStoreSizing);
} }
// flush them // flush them
flushStore(store, seqID); flushStore(store, seqID);

View File

@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -77,8 +77,8 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.MemStoreSize;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
@ -543,8 +543,8 @@ public abstract class AbstractTestWALReplay {
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
@Override @Override
protected void restoreEdit(HStore s, Cell cell, MemStoreSize memstoreSize) { protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) {
super.restoreEdit(s, cell, memstoreSize); super.restoreEdit(s, cell, memstoreSizing);
countOfRestoredEdits.incrementAndGet(); countOfRestoredEdits.incrementAndGet();
} }
}; };