HBASE-19074 Miscellaneous Observer cleanups

Breaks MemStoreSize into MemStoreSize (read-only) and MemStoreSizing
(read/write). MemStoreSize we allow to Coprocesors. MemStoreSizing we
use internally doing MemStore accounting.
This commit is contained in:
Michael Stack 2017-10-23 20:57:46 -07:00
parent a49850e5c3
commit 60367b2a27
23 changed files with 308 additions and 222 deletions

View File

@ -880,7 +880,10 @@ public interface RegionObserver {
* Called before a {@link WALEdit}
* replayed for this region.
* @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/
@Deprecated
default void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}
@ -888,7 +891,10 @@ public interface RegionObserver {
* Called after a {@link WALEdit}
* replayed for this region.
* @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/
@Deprecated
default void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}

View File

@ -72,8 +72,11 @@ public interface WALObserver {
* is writen to WAL.
*
* @return true if default behavior should be bypassed, false otherwise
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
*/
// TODO: return value is not used
@Deprecated
default boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
return false;
@ -82,7 +85,10 @@ public interface WALObserver {
/**
* Called after a {@link WALEdit}
* is writen to WAL.
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
*/
@Deprecated
default void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -96,14 +96,14 @@ public abstract class AbstractMemStore implements MemStore {
public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
@Override
public void add(Iterable<Cell> cells, MemStoreSize memstoreSize) {
public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
for (Cell cell : cells) {
add(cell, memstoreSize);
add(cell, memstoreSizing);
}
}
@Override
public void add(Cell cell, MemStoreSize memstoreSize) {
public void add(Cell cell, MemStoreSizing memstoreSizing) {
Cell toAdd = maybeCloneWithAllocator(cell);
boolean mslabUsed = (toAdd != cell);
// This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
@ -118,7 +118,7 @@ public abstract class AbstractMemStore implements MemStore {
if (!mslabUsed) {
toAdd = deepCopyIfNeeded(toAdd);
}
internalAdd(toAdd, mslabUsed, memstoreSize);
internalAdd(toAdd, mslabUsed, memstoreSizing);
}
private static Cell deepCopyIfNeeded(Cell cell) {
@ -129,9 +129,9 @@ public abstract class AbstractMemStore implements MemStore {
}
@Override
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize) {
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
for (Cell cell : cells) {
upsert(cell, readpoint, memstoreSize);
upsert(cell, readpoint, memstoreSizing);
}
}
@ -167,7 +167,11 @@ public abstract class AbstractMemStore implements MemStore {
@Override
public MemStoreSize getSnapshotSize() {
return new MemStoreSize(this.snapshot.keySize(), this.snapshot.heapSize());
return getSnapshotSizing();
}
MemStoreSizing getSnapshotSizing() {
return new MemStoreSizing(this.snapshot.keySize(), this.snapshot.heapSize());
}
@Override
@ -210,7 +214,7 @@ public abstract class AbstractMemStore implements MemStore {
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @param memstoreSize
*/
private void upsert(Cell cell, long readpoint, MemStoreSize memstoreSize) {
private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
// Add the Cell to the MemStore
// Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
@ -221,7 +225,7 @@ public abstract class AbstractMemStore implements MemStore {
// must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
// prevent it from getting GCed.
cell = deepCopyIfNeeded(cell);
this.active.upsert(cell, readpoint, memstoreSize);
this.active.upsert(cell, readpoint, memstoreSizing);
setOldestEditTimeToNow();
checkActiveSize();
}
@ -277,8 +281,8 @@ public abstract class AbstractMemStore implements MemStore {
* @param mslabUsed whether using MSLAB
* @param memstoreSize
*/
private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSize memstoreSize) {
active.add(toAdd, mslabUsed, memstoreSize);
private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) {
active.add(toAdd, mslabUsed, memstoreSizing);
setOldestEditTimeToNow();
checkActiveSize();
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize;
@ -55,7 +54,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
* of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction.
*/
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSize memstoreSize) {
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing) {
super(segment); // initiailize the upper class
incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
int numOfCells = segment.getCellsCount();
@ -65,7 +64,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
incSize(0, newSegmentSizeDelta);
memstoreSize.incMemStoreSize(0, newSegmentSizeDelta);
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta);
}
@Override

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience;
@ -61,7 +60,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
* of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction.
*/
protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemStoreSize memstoreSize) {
protected CellChunkImmutableSegment(CSLMImmutableSegment segment,
MemStoreSizing memstoreSizing) {
super(segment); // initiailize the upper class
incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
int numOfCells = segment.getCellsCount();
@ -73,7 +73,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
incSize(0, newSegmentSizeDelta);
memstoreSize.incMemStoreSize(0, newSegmentSizeDelta);
memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta);
}
@Override

View File

@ -146,12 +146,12 @@ public class CompactingMemStore extends AbstractMemStore {
*/
@Override
public MemStoreSize size() {
MemStoreSize memstoreSize = new MemStoreSize();
memstoreSize.incMemStoreSize(this.active.keySize(), this.active.heapSize());
MemStoreSizing memstoreSizing = new MemStoreSizing();
memstoreSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize());
for (Segment item : pipeline.getSegments()) {
memstoreSize.incMemStoreSize(item.keySize(), item.heapSize());
memstoreSizing.incMemStoreSize(item.keySize(), item.heapSize());
}
return memstoreSize;
return memstoreSizing;
}
/**
@ -215,17 +215,17 @@ public class CompactingMemStore extends AbstractMemStore {
*/
@Override
public MemStoreSize getFlushableSize() {
MemStoreSize snapshotSize = getSnapshotSize();
if (snapshotSize.getDataSize() == 0) {
MemStoreSizing snapshotSizing = getSnapshotSizing();
if (snapshotSizing.getDataSize() == 0) {
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
if (compositeSnapshot) {
snapshotSize = pipeline.getPipelineSize();
snapshotSize.incMemStoreSize(this.active.keySize(), this.active.heapSize());
snapshotSizing = pipeline.getPipelineSizing();
snapshotSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize());
} else {
snapshotSize = pipeline.getTailSize();
snapshotSizing = pipeline.getTailSizing();
}
}
return snapshotSize.getDataSize() > 0 ? snapshotSize
return snapshotSizing.getDataSize() > 0 ? snapshotSizing
: new MemStoreSize(this.active.keySize(), this.active.heapSize());
}

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -149,7 +149,7 @@ public class CompactionPipeline {
long newHeapSize = 0;
if(segment != null) newHeapSize = segment.heapSize();
long heapSizeDelta = suffixHeapSize - newHeapSize;
region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta));
region.addMemStoreSize(new MemStoreSizing(-dataSizeDelta, -heapSizeDelta));
if (LOG.isDebugEnabled()) {
LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: "
+ newDataSize + ". Suffix heap size: " + suffixHeapSize
@ -199,14 +199,14 @@ public class CompactionPipeline {
int i = 0;
for (ImmutableSegment s : pipeline) {
if ( s.canBeFlattened() ) {
MemStoreSize newMemstoreSize = new MemStoreSize(); // the size to be updated
MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreSize);
(CSLMImmutableSegment)s,idxType,newMemstoreAccounting);
replaceAtIndex(i,newS);
if(region != null) {
// update the global memstore size counter
// upon flattening there is no change in the data size
region.addMemStoreSize(new MemStoreSize(0, newMemstoreSize.getHeapSize()));
region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize()));
}
LOG.debug("Compaction pipeline segment " + s + " was flattened");
return true;
@ -241,22 +241,22 @@ public class CompactionPipeline {
return minSequenceId;
}
public MemStoreSize getTailSize() {
public MemStoreSizing getTailSizing() {
LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (localCopy.isEmpty()) return new MemStoreSize(true);
return new MemStoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize());
if (localCopy.isEmpty()) return new MemStoreSizing();
return new MemStoreSizing(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize());
}
public MemStoreSize getPipelineSize() {
public MemStoreSizing getPipelineSizing() {
long keySize = 0;
long heapSize = 0;
LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (localCopy.isEmpty()) return new MemStoreSize(true);
if (localCopy.isEmpty()) return new MemStoreSizing();
for (Segment segment : localCopy) {
keySize += segment.keySize();
heapSize += segment.heapSize();
}
return new MemStoreSize(keySize, heapSize);
return new MemStoreSizing(keySize, heapSize);
}
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,

View File

@ -257,13 +257,13 @@ public class CompositeImmutableSegment extends ImmutableSegment {
}
@Override
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) {
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
@Override
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemStoreSize memstoreSize) {
MemStoreSizing memstoreSizing) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}

View File

@ -195,26 +195,26 @@ public class DefaultMemStore extends AbstractMemStore {
byte [] fam = Bytes.toBytes("col");
byte [] qf = Bytes.toBytes("umn");
byte [] empty = new byte[0];
MemStoreSize memstoreSize = new MemStoreSize();
MemStoreSizing memstoreSizing = new MemStoreSizing();
for (int i = 0; i < count; i++) {
// Give each its own ts
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize);
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
}
LOG.info("memstore1 estimated size="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize()));
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
for (int i = 0; i < count; i++) {
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize);
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
}
LOG.info("memstore1 estimated size (2nd loading of same data)="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize()));
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
// Make a variably sized memstore.
DefaultMemStore memstore2 = new DefaultMemStore();
memstoreSize = new MemStoreSize();
memstoreSizing = new MemStoreSizing();
for (int i = 0; i < count; i++) {
memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSize);
memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSizing);
}
LOG.info("memstore2 estimated size="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize()));
+ (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
final int seconds = 30;
LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
LOG.info("Exiting.");

View File

@ -537,11 +537,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final long startTime;
final long flushOpSeqId;
final long flushedSeqId;
final MemStoreSize totalFlushableSize;
final MemStoreSizing totalFlushableSize;
/** Constructs an early exit case */
PrepareFlushResult(FlushResultImpl result, long flushSeqId) {
this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemStoreSize());
this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, MemStoreSizing.DUD);
}
/** Constructs a successful prepare flush result */
@ -549,7 +549,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
TreeMap<byte[], List<Path>> committedFiles,
TreeMap<byte[], MemStoreSize> storeFlushableSize, long startTime, long flushSeqId,
long flushedSeqId, MemStoreSize totalFlushableSize) {
long flushedSeqId, MemStoreSizing totalFlushableSize) {
this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
flushSeqId, flushedSeqId, totalFlushableSize);
}
@ -559,7 +559,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
TreeMap<byte[], List<Path>> committedFiles,
TreeMap<byte[], MemStoreSize> storeFlushableSize, long startTime, long flushSeqId,
long flushedSeqId, MemStoreSize totalFlushableSize) {
long flushedSeqId, MemStoreSizing totalFlushableSize) {
this.result = result;
this.storeFlushCtxs = storeFlushCtxs;
this.committedFiles = committedFiles;
@ -1711,7 +1711,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.closed.set(true);
if (!canFlush) {
this.decrMemStoreSize(new MemStoreSize(memstoreDataSize.get(), getMemStoreHeapSize()));
this.decrMemStoreSize(new MemStoreSizing(memstoreDataSize.get(), getMemStoreHeapSize()));
} else if (memstoreDataSize.get() != 0) {
LOG.error("Memstore size is " + memstoreDataSize.get());
}
@ -2502,7 +2502,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
MemStoreSize totalSizeOfFlushableStores = new MemStoreSize();
MemStoreSizing totalSizeOfFlushableStores = new MemStoreSizing();
Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
for (HStore store : storesToFlush) {
@ -2546,7 +2546,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (HStore s : storesToFlush) {
MemStoreSize flushableSize = s.getFlushableSize();
totalSizeOfFlushableStores.incMemStoreSize(flushableSize);
storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), s.createFlushContext(flushOpSeqId));
storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(),
s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // for writing stores to WAL
storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize);
}
@ -3323,7 +3324,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int cellCount = 0;
/** Keep track of the locks we hold so we can release them in finally clause */
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
MemStoreSize memStoreSize = new MemStoreSize();
MemStoreSizing memStoreAccounting = new MemStoreSizing();
try {
// STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
int numReadyToWrite = 0;
@ -3506,11 +3507,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.updateSequenceId(batchOp.familyCellMaps[i].values(),
replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
}
applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreSize);
applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreAccounting);
}
// update memstore size
this.addAndGetMemStoreSize(memStoreSize);
this.addAndGetMemStoreSize(memStoreAccounting);
// calling the post CP hook for batch mutation
if (!replay && coprocessorHost != null) {
@ -3983,12 +3984,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param memstoreSize
*/
private void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
MemStoreSize memstoreSize) throws IOException {
MemStoreSizing memstoreAccounting) throws IOException {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
assert cells instanceof RandomAccess;
applyToMemStore(getStore(family), cells, false, memstoreSize);
applyToMemStore(getStore(family), cells, false, memstoreAccounting);
}
}
@ -3996,30 +3997,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
* set; when set we will run operations that make sense in the increment/append scenario
* but that do not make sense otherwise.
* @see #applyToMemStore(HStore, Cell, MemStoreSize)
* @see #applyToMemStore(HStore, Cell, MemStoreSizing)
*/
private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
MemStoreSize memstoreSize) throws IOException {
MemStoreSizing memstoreAccounting) throws IOException {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
store.upsert(cells, getSmallestReadPoint(), memstoreSize);
store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
} else {
store.add(cells, memstoreSize);
store.add(cells, memstoreAccounting);
}
}
/**
* @see #applyToMemStore(HStore, List, boolean, MemStoreSize)
* @see #applyToMemStore(HStore, List, boolean, MemStoreSizing)
*/
private void applyToMemStore(HStore store, Cell cell, MemStoreSize memstoreSize)
private void applyToMemStore(HStore store, Cell cell, MemStoreSizing memstoreAccounting)
throws IOException {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
if (store == null) {
checkFamily(CellUtil.cloneFamily(cell));
// Unreachable because checkFamily will throw exception
}
store.add(cell, memstoreSize);
store.add(cell, memstoreAccounting);
}
/**
@ -4347,7 +4348,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
boolean flush = false;
MemStoreSize memstoreSize = new MemStoreSize();
MemStoreSizing memstoreSize = new MemStoreSizing();
for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
@ -4843,7 +4844,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException
*/
private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException {
MemStoreSize totalFreedSize = new MemStoreSize();
MemStoreSizing totalFreedSize = new MemStoreSizing();
this.updatesLock.writeLock().lock();
try {
@ -5271,15 +5272,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Used by tests
* @param s Store to add edit too.
* @param cell Cell to add.
* @param memstoreSize
*/
@VisibleForTesting
protected void restoreEdit(HStore s, Cell cell, MemStoreSize memstoreSize) {
s.add(cell, memstoreSize);
protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreAccounting) {
s.add(cell, memstoreAccounting);
}
/**
* @param fs
* @param p File to check.
* @return True if file was zero-length (and if so, we'll delete it in here).
* @throws IOException
@ -7114,7 +7113,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This is assigned by mvcc either explicity in the below or in the guts of the WAL append
// when it assigns the edit a sequencedid (A.K.A the mvcc write number).
WriteEntry writeEntry = null;
MemStoreSize memstoreSize = new MemStoreSize();
MemStoreSizing memstoreAccounting = new MemStoreSizing();
try {
boolean success = false;
try {
@ -7158,7 +7157,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If no WAL, need to stamp it here.
CellUtil.setSequenceId(cell, sequenceId);
}
applyToMemStore(getStore(cell), cell, memstoreSize);
applyToMemStore(getStore(cell), cell, memstoreAccounting);
}
}
@ -7194,7 +7193,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally {
closeRegionOperation();
if (!mutations.isEmpty()) {
long newSize = this.addAndGetMemStoreSize(memstoreSize);
long newSize = this.addAndGetMemStoreSize(memstoreAccounting);
requestFlushIfNeeded(newSize);
}
}
@ -7298,7 +7297,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(op);
List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null;
RowLock rowLock = null;
MemStoreSize memstoreSize = new MemStoreSize();
MemStoreSizing memstoreAccounting = new MemStoreSizing();
try {
rowLock = getRowLockInternal(mutation.getRow(), false);
lock(this.updatesLock.readLock());
@ -7324,7 +7323,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Now write to MemStore. Do it a column family at a time.
for (Map.Entry<HStore, List<Cell>> e : forMemStore.entrySet()) {
applyToMemStore(e.getKey(), e.getValue(), true, memstoreSize);
applyToMemStore(e.getKey(), e.getValue(), true, memstoreAccounting);
}
mvcc.completeAndWait(writeEntry);
if (rsServices != null && rsServices.getNonceManager() != null) {
@ -7347,7 +7346,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock.release();
}
// Request a cache flush if over the limit. Do it outside update lock.
if (isFlushSize(addAndGetMemStoreSize(memstoreSize))) {
if (isFlushSize(addAndGetMemStoreSize(memstoreAccounting))) {
requestFlush();
}
closeRegionOperation(op);

View File

@ -682,13 +682,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
/**
* Adds a value to the memstore
* @param cell
* @param memstoreSize
*/
public void add(final Cell cell, MemStoreSize memstoreSize) {
public void add(final Cell cell, MemStoreSizing memstoreSizing) {
lock.readLock().lock();
try {
this.memstore.add(cell, memstoreSize);
this.memstore.add(cell, memstoreSizing);
} finally {
lock.readLock().unlock();
}
@ -696,13 +694,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
/**
* Adds the specified value to the memstore
* @param cells
* @param memstoreSize
*/
public void add(final Iterable<Cell> cells, MemStoreSize memstoreSize) {
public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
lock.readLock().lock();
try {
memstore.add(cells, memstoreSize);
memstore.add(cells, memstoreSizing);
} finally {
lock.readLock().unlock();
}
@ -2143,16 +2139,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* <p>
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
* across all of them.
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @param memstoreSize
* @throws IOException
*/
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize)
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing)
throws IOException {
this.lock.readLock().lock();
try {
this.memstore.upsert(cells, readpoint, memstoreSize);
this.memstore.upsert(cells, readpoint, memstoreSizing);
} finally {
this.lock.readLock().unlock();
}

View File

@ -68,18 +68,18 @@ public interface MemStore {
/**
* Write an update
* @param cell
* @param memstoreSize The delta in memstore size will be passed back via this.
* @param memstoreSizing The delta in memstore size will be passed back via this.
* This will include both data size and heap overhead delta.
*/
void add(final Cell cell, MemStoreSize memstoreSize);
void add(final Cell cell, MemStoreSizing memstoreSizing);
/**
* Write the updates
* @param cells
* @param memstoreSize The delta in memstore size will be passed back via this.
* @param memstoreSizing The delta in memstore size will be passed back via this.
* This will include both data size and heap overhead delta.
*/
void add(Iterable<Cell> cells, MemStoreSize memstoreSize);
void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing);
/**
* @return Oldest timestamp of all the Cells in the MemStore
@ -99,10 +99,10 @@ public interface MemStore {
* only see each KeyValue update as atomic.
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate Cells.
* @param memstoreSize The delta in memstore size will be passed back via this.
* @param memstoreSizing The delta in memstore size will be passed back via this.
* This will include both data size and heap overhead delta.
*/
void upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize);
void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing);
/**
* @return scanner over the memstore. This might include scanner over the snapshot when one is

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,71 +17,47 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Wraps the data size part and total heap space occupied by the memstore.
* Reports the data size part and total heap space occupied by the MemStore.
* Read-only.
* @see MemStoreSizing
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public class MemStoreSize {
/**
*'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can
* be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap
* or off heap LABs
*/
protected long dataSize;
// 'dataSize' tracks the Cell's data bytes size alone (Key bytes, value bytes). A cell's data can
// be in on heap or off heap area depending on the MSLAB and its configuration to be using on heap
// or off heap LABs
private long dataSize;
// 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
// When Cells in on heap area, this will include the cells data size as well.
private long heapSize;
final private boolean isEmpty;
/** 'heapSize' tracks all Cell's heap size occupancy. This will include Cell POJO heap overhead.
* When Cells in on heap area, this will include the cells data size as well.
*/
protected long heapSize;
public MemStoreSize() {
dataSize = 0;
heapSize = 0;
isEmpty = false;
}
public MemStoreSize(boolean isEmpty) {
dataSize = 0;
heapSize = 0;
this.isEmpty = isEmpty;
}
public boolean isEmpty() {
return isEmpty;
this(0L, 0L);
}
public MemStoreSize(long dataSize, long heapSize) {
this.dataSize = dataSize;
this.heapSize = heapSize;
this.isEmpty = false;
}
public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
this.dataSize += dataSizeDelta;
this.heapSize += heapSizeDelta;
}
public void incMemStoreSize(MemStoreSize delta) {
this.dataSize += delta.dataSize;
this.heapSize += delta.heapSize;
}
public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
this.dataSize -= dataSizeDelta;
this.heapSize -= heapSizeDelta;
}
public void decMemStoreSize(MemStoreSize delta) {
this.dataSize -= delta.dataSize;
this.heapSize -= delta.heapSize;
public boolean isEmpty() {
return this.dataSize == 0 && this.heapSize == 0;
}
public long getDataSize() {
return isEmpty ? 0 : dataSize;
return this.dataSize;
}
public long getHeapSize() {
return isEmpty ? 0 : heapSize;
return this.heapSize;
}
@Override

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Accounting of current heap and data sizes.
* Allows read/write on data/heap size as opposed to {@Link MemStoreSize} which is read-only.
* For internal use.
* @see MemStoreSize
*/
@InterfaceAudience.Private
public class MemStoreSizing extends MemStoreSize {
public static final MemStoreSizing DUD = new MemStoreSizing() {
@Override
public void incMemStoreSize(MemStoreSize delta) {
incMemStoreSize(delta.getDataSize(), delta.getHeapSize());
}
@Override
public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
throw new RuntimeException("I'm a dud, you can't use me!");
}
@Override
public void decMemStoreSize(MemStoreSize delta) {
decMemStoreSize(delta.getDataSize(), delta.getHeapSize());
}
@Override
public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
throw new RuntimeException("I'm a dud, you can't use me!");
}
};
public MemStoreSizing() {
super();
}
public MemStoreSizing(long dataSize, long heapSize) {
super(dataSize, heapSize);
}
public void incMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
this.dataSize += dataSizeDelta;
this.heapSize += heapSizeDelta;
}
public void incMemStoreSize(MemStoreSize delta) {
incMemStoreSize(delta.getDataSize(), delta.getHeapSize());
}
public void decMemStoreSize(long dataSizeDelta, long heapSizeDelta) {
this.dataSize -= dataSizeDelta;
this.heapSize -= heapSizeDelta;
}
public void decMemStoreSize(MemStoreSize delta) {
decMemStoreSize(delta.getDataSize(), delta.getHeapSize());
}
public void empty() {
this.dataSize = 0L;
this.heapSize = 0L;
}
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof MemStoreSizing)) {
return false;
}
MemStoreSizing other = (MemStoreSizing) obj;
return this.dataSize == other.dataSize && this.heapSize == other.heapSize;
}
@Override
public int hashCode() {
long h = 13 * this.dataSize;
h = h + 14 * this.heapSize;
return (int) h;
}
@Override
public String toString() {
return "dataSize=" + this.dataSize + " , heapSize=" + this.heapSize;
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
* org.apache.hadoop.hbase.coprocessor.ObserverContext, MiniBatchOperationInProgress)
* @param T Pair&lt;Mutation, Integer&gt; pair of Mutations and associated rowlock ids .
*/
@InterfaceAudience.LimitedPrivate("Coprocessors")
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public class MiniBatchOperationInProgress<T> {
private final T[] operations;
private Mutation[][] operationsFromCoprocessors;

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -51,14 +50,13 @@ public class MutableSegment extends Segment {
* Adds the given cell into the segment
* @param cell the cell to add
* @param mslabUsed whether using MSLAB
* @param memstoreSize
*/
public void add(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) {
internalAdd(cell, mslabUsed, memstoreSize);
public void add(Cell cell, boolean mslabUsed, MemStoreSizing memStoreSizing) {
internalAdd(cell, mslabUsed, memStoreSizing);
}
public void upsert(Cell cell, long readpoint, MemStoreSize memstoreSize) {
internalAdd(cell, false, memstoreSize);
public void upsert(Cell cell, long readpoint, MemStoreSizing memStoreSizing) {
internalAdd(cell, false, memStoreSizing);
// Get the Cells for the row/family/qualifier regardless of timestamp.
// For this case we want to clean up any other puts
@ -90,8 +88,8 @@ public class MutableSegment extends Segment {
int cellLen = getCellLength(cur);
long heapSize = heapSizeChange(cur, true);
this.incSize(-cellLen, -heapSize);
if (memstoreSize != null) {
memstoreSize.decMemStoreSize(cellLen, heapSize);
if (memStoreSizing != null) {
memStoreSizing.decMemStoreSize(cellLen, heapSize);
}
it.remove();
} else {

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.Pair;
*/
@InterfaceAudience.Private
public class RegionServerAccounting {
// memstore data size
private final LongAdder globalMemstoreDataSize = new LongAdder();
// memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell
@ -46,7 +45,7 @@ public class RegionServerAccounting {
// Store the edits size during replaying WAL. Use this to roll back the
// global memstore size once a region opening failed.
private final ConcurrentMap<byte[], MemStoreSize> replayEditsPerRegion =
private final ConcurrentMap<byte[], MemStoreSizing> replayEditsPerRegion =
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
private long globalMemStoreLimit;
@ -216,14 +215,14 @@ public class RegionServerAccounting {
* @param memStoreSize the Memstore size will be added to replayEditsPerRegion.
*/
public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) {
MemStoreSize replayEdistsSize = replayEditsPerRegion.get(regionName);
MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName);
// All ops on the same MemStoreSize object is going to be done by single thread, sequentially
// only. First calls to this method to increment the per region reply edits size and then call
// to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of
// the region open operation. No need to handle multi thread issues on one region's entry in
// this Map.
if (replayEdistsSize == null) {
replayEdistsSize = new MemStoreSize();
replayEdistsSize = new MemStoreSizing();
replayEditsPerRegion.put(regionName, replayEdistsSize);
}
replayEdistsSize.incMemStoreSize(memStoreSize);

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -277,13 +277,13 @@ public abstract class Segment {
return comparator;
}
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) {
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
boolean succ = getCellSet().add(cell);
updateMetaInfo(cell, succ, mslabUsed, memstoreSize);
updateMetaInfo(cell, succ, mslabUsed, memstoreSizing);
}
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemStoreSize memstoreSize) {
MemStoreSizing memstoreSizing) {
long cellSize = 0;
// If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
// MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
@ -293,8 +293,8 @@ public abstract class Segment {
}
long heapSize = heapSizeChange(cellToAdd, succ);
incSize(cellSize, heapSize);
if (memstoreSize != null) {
memstoreSize.incMemStoreSize(cellSize, heapSize);
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, heapSize);
}
getTimeRangeTracker().includeTimestamp(cellToAdd);
minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
@ -307,8 +307,8 @@ public abstract class Segment {
}
}
protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSize memstoreSize) {
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSize);
protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemStoreSizing memstoreSizing) {
updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSizing);
}
/**

View File

@ -95,17 +95,18 @@ public final class SegmentFactory {
// create flat immutable segment from non-flat immutable segment
// for flattening
public ImmutableSegment createImmutableSegmentByFlattening(
CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, MemStoreSize memstoreSize) {
CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType,
MemStoreSizing memstoreSizing) {
ImmutableSegment res = null;
switch (idxType) {
case CHUNK_MAP:
res = new CellChunkImmutableSegment(segment, memstoreSize);
res = new CellChunkImmutableSegment(segment, memstoreSizing);
break;
case CSLM_MAP:
assert false; // non-flat segment can not be the result of flattening
break;
case ARRAY_MAP:
res = new CellArrayImmutableSegment(segment, memstoreSize);
res = new CellArrayImmutableSegment(segment, memstoreSizing);
break;
}
return res;

View File

@ -607,18 +607,18 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
MemStoreSize memstoreSize = new MemStoreSize();
MemStoreSizing memstoreSizing = new MemStoreSizing();
for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts
byte[] row = Bytes.toBytes(keys[i]);
byte[] val = Bytes.toBytes(keys[i] + i);
KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
hmc.add(kv, memstoreSize);
hmc.add(kv, memstoreSizing);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
}
regionServicesForStores.addMemStoreSize(memstoreSize);
return memstoreSize.getDataSize();
regionServicesForStores.addMemStoreSize(memstoreSizing);
return memstoreSizing.getDataSize();
}
private long cellBeforeFlushSize() {

View File

@ -126,9 +126,9 @@ public class TestDefaultMemStore {
public void testPutSameCell() {
byte[] bytes = Bytes.toBytes(getName());
KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
MemStoreSize sizeChangeForFirstCell = new MemStoreSize();
MemStoreSizing sizeChangeForFirstCell = new MemStoreSizing();
this.memstore.add(kv, sizeChangeForFirstCell);
MemStoreSize sizeChangeForSecondCell = new MemStoreSize();
MemStoreSizing sizeChangeForSecondCell = new MemStoreSizing();
this.memstore.add(kv, sizeChangeForSecondCell);
// make sure memstore size increase won't double-count MSLAB chunk size
assertEquals(Segment.getCellLength(kv), sizeChangeForFirstCell.getDataSize());

View File

@ -230,7 +230,7 @@ public class TestHStore {
* @throws Exception
*/
@Test
public void testFlushSizeAccounting() throws Exception {
public void testFlushSizeSizing() throws Exception {
LOG.info("Setting up a faulty file system that cannot write in " +
this.name.getMethodName());
final Configuration conf = HBaseConfiguration.create();
@ -254,7 +254,7 @@ public class TestHStore {
MemStoreSize size = store.memstore.getFlushableSize();
assertEquals(0, size.getDataSize());
LOG.info("Adding some data");
MemStoreSize kvSize = new MemStoreSize();
MemStoreSizing kvSize = new MemStoreSizing();
store.add(new KeyValue(row, family, qf1, 1, (byte[]) null), kvSize);
// add the heap size of active (mutable) segment
kvSize.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD);
@ -273,7 +273,7 @@ public class TestHStore {
CSLMImmutableSegment.DEEP_OVERHEAD_CSLM-MutableSegment.DEEP_OVERHEAD);
size = store.memstore.getFlushableSize();
assertEquals(kvSize, size);
MemStoreSize kvSize2 = new MemStoreSize();
MemStoreSizing kvSize2 = new MemStoreSizing();
store.add(new KeyValue(row, family, qf2, 2, (byte[])null), kvSize2);
kvSize2.incMemStoreSize(0, MutableSegment.DEEP_OVERHEAD);
// Even though we add a new kv, we expect the flushable size to be 'same' since we have
@ -1217,7 +1217,7 @@ public class TestHStore {
byte[] value0 = Bytes.toBytes("value0");
byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");
MemStoreSize memStoreSize = new MemStoreSize();
MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
@ -1229,18 +1229,18 @@ public class TestHStore {
}
});
// The cells having the value0 won't be flushed to disk because the value of max version is 1
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize);
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize);
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize);
store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize);
store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize);
store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize);
store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize);
store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize);
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSizing);
store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSizing);
store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSizing);
List<Cell> myList = new MyList<>(hook);
Scan scan = new Scan()
.withStartRow(r1)
@ -1276,13 +1276,13 @@ public class TestHStore {
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("value");
MemStoreSize memStoreSize = new MemStoreSize();
MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSize);
store.add(createCell(qf2, ts, seqId, value), memStoreSize);
store.add(createCell(qf3, ts, seqId, value), memStoreSize);
store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
@ -1354,22 +1354,22 @@ public class TestHStore {
});
byte[] oldValue = Bytes.toBytes("oldValue");
byte[] currentValue = Bytes.toBytes("currentValue");
MemStoreSize memStoreSize = new MemStoreSize();
MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize);
myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize);
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize);
myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSizing);
myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSizing);
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
long snapshotId = id++;
// push older data into snapshot -- phase (1/4)
StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
storeFlushCtx.prepare();
// insert current data into active -- phase (2/4)
myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize);
myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize);
myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize);
myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSizing);
myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSizing);
myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSizing);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
@ -1467,21 +1467,21 @@ public class TestHStore {
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("thisisavarylargevalue");
MemStoreSize memStoreSize = new MemStoreSize();
MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSize);
store.add(createCell(qf2, ts, seqId, value), memStoreSize);
store.add(createCell(qf3, ts, seqId, value), memStoreSize);
store.add(createCell(qf1, ts, seqId, value), memStoreSizing);
store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
storeFlushCtx.prepare();
// This shouldn't invoke another in-memory flush because the first compactor thread
// hasn't accomplished the in-memory compaction.
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSizing);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
//okay. Let the compaction be completed
MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
@ -1490,9 +1490,9 @@ public class TestHStore {
TimeUnit.SECONDS.sleep(1);
}
// This should invoke another in-memory flush.
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSizing);
assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
@ -1589,25 +1589,25 @@ public class TestHStore {
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
// Set the lower threshold to invoke the "MERGE" policy
MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() {});
MemStoreSize memStoreSize = new MemStoreSize();
MemStoreSizing memStoreSizing = new MemStoreSizing();
long ts = System.currentTimeMillis();
long seqID = 1l;
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
memStoreSizing);
}
// flush them
flushStore(store, seqID);
for (int i = 11; i < 20; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
memStoreSizing);
}
// flush them
flushStore(store, seqID);
for (int i = 21; i < 30; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
memStoreSizing);
}
// flush them
flushStore(store, seqID);
@ -1624,14 +1624,14 @@ public class TestHStore {
// create more store files
for (int i = 31; i < 40; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
memStoreSizing);
}
// flush them
flushStore(store, seqID);
for (int i = 41; i < 50; i++) {
store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
memStoreSize);
memStoreSizing);
}
// flush them
flushStore(store, seqID);

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -77,8 +77,8 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStoreSizing;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.MemStoreSize;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@ -543,8 +543,8 @@ public abstract class AbstractTestWALReplay {
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
@Override
protected void restoreEdit(HStore s, Cell cell, MemStoreSize memstoreSize) {
super.restoreEdit(s, cell, memstoreSize);
protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) {
super.restoreEdit(s, cell, memstoreSizing);
countOfRestoredEdits.incrementAndGet();
}
};