HBASE-16229 Cleaning up size and heapSize calculation.

This commit is contained in:
anoopsamjohn 2016-09-13 11:43:26 +05:30
parent 831fb3ccb8
commit 2ab3384669
17 changed files with 419 additions and 385 deletions

View File

@ -46,6 +46,12 @@ public class ClassSize {
/** Overhead for ArrayList(0) */ /** Overhead for ArrayList(0) */
public static final int ARRAYLIST; public static final int ARRAYLIST;
/** Overhead for LinkedList(0) */
public static final int LINKEDLIST;
/** Overhead for a single entry in LinkedList */
public static final int LINKEDLIST_ENTRY;
/** Overhead for ByteBuffer */ /** Overhead for ByteBuffer */
public static final int BYTE_BUFFER; public static final int BYTE_BUFFER;
@ -100,6 +106,9 @@ public class ClassSize {
/** Overhead for AtomicBoolean */ /** Overhead for AtomicBoolean */
public static final int ATOMIC_BOOLEAN; public static final int ATOMIC_BOOLEAN;
/** Overhead for AtomicReference */
public static final int ATOMIC_REFERENCE;
/** Overhead for CopyOnWriteArraySet */ /** Overhead for CopyOnWriteArraySet */
public static final int COPYONWRITE_ARRAYSET; public static final int COPYONWRITE_ARRAYSET;
@ -240,6 +249,10 @@ public class ClassSize {
ARRAYLIST = align(OBJECT + REFERENCE + (2 * Bytes.SIZEOF_INT)) + align(ARRAY); ARRAYLIST = align(OBJECT + REFERENCE + (2 * Bytes.SIZEOF_INT)) + align(ARRAY);
LINKEDLIST = align(OBJECT + (2 * Bytes.SIZEOF_INT) + (2 * REFERENCE));
LINKEDLIST_ENTRY = align(OBJECT + (2 * REFERENCE));
//noinspection PointlessArithmeticExpression //noinspection PointlessArithmeticExpression
BYTE_BUFFER = align(OBJECT + REFERENCE + BYTE_BUFFER = align(OBJECT + REFERENCE +
(5 * Bytes.SIZEOF_INT) + (5 * Bytes.SIZEOF_INT) +
@ -292,6 +305,8 @@ public class ClassSize {
ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN); ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN);
ATOMIC_REFERENCE = align(OBJECT + REFERENCE);
COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE); COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE);
COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY); COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY);

View File

@ -52,34 +52,29 @@ public abstract class AbstractMemStore implements MemStore {
private final CellComparator comparator; private final CellComparator comparator;
// active segment absorbs write operations // active segment absorbs write operations
private volatile MutableSegment active; protected volatile MutableSegment active;
// Snapshot of memstore. Made for flusher. // Snapshot of memstore. Made for flusher.
private volatile ImmutableSegment snapshot; protected volatile ImmutableSegment snapshot;
protected volatile long snapshotId; protected volatile long snapshotId;
// Used to track when to flush // Used to track when to flush
private volatile long timeOfOldestEdit; private volatile long timeOfOldestEdit;
public final static long FIXED_OVERHEAD = ClassSize.align( public final static long FIXED_OVERHEAD = ClassSize
ClassSize.OBJECT + .align(ClassSize.OBJECT + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
(4 * ClassSize.REFERENCE) +
(2 * Bytes.SIZEOF_LONG));
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
(ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER +
ClassSize.CELL_SET + ClassSize.CONCURRENT_SKIPLISTMAP));
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
protected AbstractMemStore(final Configuration conf, final CellComparator c) { protected AbstractMemStore(final Configuration conf, final CellComparator c) {
this.conf = conf; this.conf = conf;
this.comparator = c; this.comparator = c;
resetActive(); resetActive();
this.snapshot = SegmentFactory.instance().createImmutableSegment(c, 0); this.snapshot = SegmentFactory.instance().createImmutableSegment(c);
this.snapshotId = NO_SNAPSHOT_ID; this.snapshotId = NO_SNAPSHOT_ID;
} }
protected void resetActive() { protected void resetActive() {
// Reset heap to not include any keys // Reset heap to not include any keys
this.active = SegmentFactory.instance().createMutableSegment(conf, comparator, DEEP_OVERHEAD); this.active = SegmentFactory.instance().createMutableSegment(conf, comparator);
this.timeOfOldestEdit = Long.MAX_VALUE; this.timeOfOldestEdit = Long.MAX_VALUE;
} }
@ -200,8 +195,7 @@ public abstract class AbstractMemStore implements MemStore {
// create a new snapshot and let the old one go. // create a new snapshot and let the old one go.
Segment oldSnapshot = this.snapshot; Segment oldSnapshot = this.snapshot;
if (!this.snapshot.isEmpty()) { if (!this.snapshot.isEmpty()) {
this.snapshot = SegmentFactory.instance().createImmutableSegment( this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
getComparator(), 0);
} }
this.snapshotId = NO_SNAPSHOT_ID; this.snapshotId = NO_SNAPSHOT_ID;
oldSnapshot.close(); oldSnapshot.close();
@ -213,12 +207,12 @@ public abstract class AbstractMemStore implements MemStore {
*/ */
@Override @Override
public long heapSize() { public long heapSize() {
return getActive().getSize(); return size();
} }
@Override @Override
public long getSnapshotSize() { public long getSnapshotSize() {
return getSnapshot().getSize(); return this.snapshot.keySize();
} }
@Override @Override
@ -385,7 +379,7 @@ public abstract class AbstractMemStore implements MemStore {
// so we cant add the new Cell w/o knowing what's there already, but we also // so we cant add the new Cell w/o knowing what's there already, but we also
// want to take this chance to delete some cells. So two loops (sad) // want to take this chance to delete some cells. So two loops (sad)
SortedSet<Cell> ss = getActive().tailSet(firstCell); SortedSet<Cell> ss = this.active.tailSet(firstCell);
for (Cell cell : ss) { for (Cell cell : ss) {
// if this isnt the row we are interested in, then bail: // if this isnt the row we are interested in, then bail:
if (!CellUtil.matchingColumn(cell, family, qualifier) if (!CellUtil.matchingColumn(cell, family, qualifier)
@ -433,38 +427,33 @@ public abstract class AbstractMemStore implements MemStore {
} }
} }
/**
* @return The size of the active segment. Means sum of all cell's size.
*/
protected long keySize() { protected long keySize() {
return heapSize() - DEEP_OVERHEAD; return this.active.keySize();
} }
protected CellComparator getComparator() { protected CellComparator getComparator() {
return comparator; return comparator;
} }
protected MutableSegment getActive() { @VisibleForTesting
MutableSegment getActive() {
return active; return active;
} }
protected ImmutableSegment getSnapshot() { @VisibleForTesting
ImmutableSegment getSnapshot() {
return snapshot; return snapshot;
} }
protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) {
this.snapshot = snapshot;
return this;
}
protected void setSnapshotSize(long snapshotSize) {
getSnapshot().setSize(snapshotSize);
}
/** /**
* Check whether anything need to be done based on the current active set size * Check whether anything need to be done based on the current active set size
*/ */
protected abstract void checkActiveSize(); protected abstract void checkActiveSize();
/** /**
* Returns an ordered list of segments from most recent to oldest in memstore
* @return an ordered list of segments from most recent to oldest in memstore * @return an ordered list of segments from most recent to oldest in memstore
*/ */
protected abstract List<Segment> getSegments() throws IOException; protected abstract List<Segment> getSegments() throws IOException;

View File

@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -52,13 +51,6 @@ import org.apache.hadoop.hbase.wal.WAL;
@InterfaceAudience.Private @InterfaceAudience.Private
public class CompactingMemStore extends AbstractMemStore { public class CompactingMemStore extends AbstractMemStore {
public final static long DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM = ClassSize.align(
ClassSize.TIMERANGE_TRACKER + ClassSize.CELL_SET + ClassSize.CONCURRENT_SKIPLISTMAP);
public final static long DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM = ClassSize.align(
ClassSize.TIMERANGE_TRACKER + ClassSize.TIMERANGE +
ClassSize.CELL_SET + ClassSize.CELL_ARRAY_MAP);
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor"; "hbase.memstore.inmemoryflush.threshold.factor";
@ -75,6 +67,13 @@ public class CompactingMemStore extends AbstractMemStore {
@VisibleForTesting @VisibleForTesting
private final AtomicBoolean allowCompaction = new AtomicBoolean(true); private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
public static final long DEEP_OVERHEAD = AbstractMemStore.DEEP_OVERHEAD
+ 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
// MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
+ Bytes.SIZEOF_LONG // inmemoryFlushSize
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
+ CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD;
public CompactingMemStore(Configuration conf, CellComparator c, public CompactingMemStore(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices) throws IOException { HStore store, RegionServicesForStores regionServices) throws IOException {
super(conf, c); super(conf, c);
@ -100,28 +99,18 @@ public class CompactingMemStore extends AbstractMemStore {
LOG.info("Setting in-memory flush size threshold to " + inmemoryFlushSize); LOG.info("Setting in-memory flush size threshold to " + inmemoryFlushSize);
} }
public static long getSegmentSize(Segment segment) {
return segment.keySize();
}
public static long getSegmentsSize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += getSegmentSize(segment);
}
return res;
}
/** /**
* @return Total memory occupied by this MemStore. * @return Total memory occupied by this MemStore. This includes active segment size and heap size
* This is not thread safe and the memstore may be changed while computing its size. * overhead of this memstore but won't include any size occupied by the snapshot. We
* It is the responsibility of the caller to make sure this doesn't happen. * assume the snapshot will get cleared soon. This is not thread safe and the memstore may
* be changed while computing its size. It is the responsibility of the caller to make
* sure this doesn't happen.
*/ */
@Override @Override
public long size() { public long size() {
long res = 0; long res = DEEP_OVERHEAD + this.active.size();
for (Segment item : getSegments()) { for (Segment item : pipeline.getSegments()) {
res += item.getSize(); res += CompactionPipeline.ENTRY_OVERHEAD + item.size();
} }
return res; return res;
} }
@ -131,11 +120,13 @@ public class CompactingMemStore extends AbstractMemStore {
* The store may do any post-flush actions at this point. * The store may do any post-flush actions at this point.
* One example is to update the WAL with sequence number that is known only at the store level. * One example is to update the WAL with sequence number that is known only at the store level.
*/ */
@Override public void finalizeFlush() { @Override
public void finalizeFlush() {
updateLowestUnflushedSequenceIdInWAL(false); updateLowestUnflushedSequenceIdInWAL(false);
} }
@Override public boolean isSloppy() { @Override
public boolean isSloppy() {
return true; return true;
} }
@ -148,10 +139,9 @@ public class CompactingMemStore extends AbstractMemStore {
*/ */
@Override @Override
public MemStoreSnapshot snapshot() { public MemStoreSnapshot snapshot() {
MutableSegment active = getActive();
// If snapshot currently has entries, then flusher failed or didn't call // If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning. // cleanup. Log a warning.
if (!getSnapshot().isEmpty()) { if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " + LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?"); "Doing nothing. Another ongoing flush or did we fail last attempt?");
} else { } else {
@ -161,21 +151,22 @@ public class CompactingMemStore extends AbstractMemStore {
+ getFamilyName()); + getFamilyName());
} }
stopCompaction(); stopCompaction();
pushActiveToPipeline(active); pushActiveToPipeline(this.active);
snapshotId = EnvironmentEdgeManager.currentTime(); snapshotId = EnvironmentEdgeManager.currentTime();
pushTailToSnapshot(); pushTailToSnapshot();
} }
return new MemStoreSnapshot(snapshotId, getSnapshot()); return new MemStoreSnapshot(snapshotId, this.snapshot);
} }
/** /**
* On flush, how much memory we will clear. * On flush, how much memory we will clear.
* @return size of data that is going to be flushed * @return size of data that is going to be flushed
*/ */
@Override public long getFlushableSize() { @Override
long snapshotSize = getSnapshot().getSize(); public long getFlushableSize() {
if(snapshotSize == 0) { long snapshotSize = getSnapshotSize();
//if snapshot is empty the tail of the pipeline is flushed if (snapshotSize == 0) {
// if snapshot is empty the tail of the pipeline is flushed
snapshotSize = pipeline.getTailSize(); snapshotSize = pipeline.getTailSize();
} }
return snapshotSize > 0 ? snapshotSize : keySize(); return snapshotSize > 0 ? snapshotSize : keySize();
@ -186,7 +177,7 @@ public class CompactingMemStore extends AbstractMemStore {
long minSequenceId = pipeline.getMinSequenceId(); long minSequenceId = pipeline.getMinSequenceId();
if(minSequenceId != Long.MAX_VALUE) { if(minSequenceId != Long.MAX_VALUE) {
byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes(); byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
byte[] familyName = getFamilyNameInByte(); byte[] familyName = getFamilyNameInBytes();
WAL WAL = getRegionServices().getWAL(); WAL WAL = getRegionServices().getWAL();
if (WAL != null) { if (WAL != null) {
WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater); WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
@ -197,10 +188,10 @@ public class CompactingMemStore extends AbstractMemStore {
@Override @Override
public List<Segment> getSegments() { public List<Segment> getSegments() {
List<Segment> pipelineList = pipeline.getSegments(); List<Segment> pipelineList = pipeline.getSegments();
List<Segment> list = new LinkedList<Segment>(); List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
list.add(getActive()); list.add(this.active);
list.addAll(pipelineList); list.addAll(pipelineList);
list.add(getSnapshot()); list.add(this.snapshot);
return list; return list;
} }
@ -235,7 +226,7 @@ public class CompactingMemStore extends AbstractMemStore {
} }
public String getFamilyName() { public String getFamilyName() {
return Bytes.toString(getFamilyNameInByte()); return Bytes.toString(getFamilyNameInBytes());
} }
@Override @Override
@ -248,12 +239,12 @@ public class CompactingMemStore extends AbstractMemStore {
// The list of elements in pipeline + the active element + the snapshot segment // The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element // TODO : This will change when the snapshot is made of more than one element
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2); List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
list.add(getActive().getScanner(readPt, order + 1)); list.add(this.active.getScanner(readPt, order + 1));
for (Segment item : pipelineList) { for (Segment item : pipelineList) {
list.add(item.getScanner(readPt, order)); list.add(item.getScanner(readPt, order));
order--; order--;
} }
list.add(getSnapshot().getScanner(readPt, order)); list.add(this.snapshot.getScanner(readPt, order));
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list)); return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
} }
@ -291,11 +282,10 @@ public class CompactingMemStore extends AbstractMemStore {
// Phase I: Update the pipeline // Phase I: Update the pipeline
getRegionServices().blockUpdates(); getRegionServices().blockUpdates();
try { try {
MutableSegment active = getActive();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
} }
pushActiveToPipeline(active); pushActiveToPipeline(this.active);
} finally { } finally {
getRegionServices().unblockUpdates(); getRegionServices().unblockUpdates();
} }
@ -319,7 +309,7 @@ public class CompactingMemStore extends AbstractMemStore {
} }
} }
private byte[] getFamilyNameInByte() { private byte[] getFamilyNameInBytes() {
return store.getFamily().getName(); return store.getFamily().getName();
} }
@ -328,7 +318,7 @@ public class CompactingMemStore extends AbstractMemStore {
} }
private boolean shouldFlushInMemory() { private boolean shouldFlushInMemory() {
if(getActive().getSize() > inmemoryFlushSize) { // size above flush threshold if (this.active.size() > inmemoryFlushSize) { // size above flush threshold
// the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
// the insert of the active into the compaction pipeline // the insert of the active into the compaction pipeline
return (inMemoryFlushInProgress.compareAndSet(false,true)); return (inMemoryFlushInProgress.compareAndSet(false,true));
@ -350,8 +340,6 @@ public class CompactingMemStore extends AbstractMemStore {
private void pushActiveToPipeline(MutableSegment active) { private void pushActiveToPipeline(MutableSegment active) {
if (!active.isEmpty()) { if (!active.isEmpty()) {
long delta = DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM - DEEP_OVERHEAD;
active.incSize(delta);
pipeline.pushHead(active); pipeline.pushHead(active);
resetActive(); resetActive();
} }
@ -360,9 +348,7 @@ public class CompactingMemStore extends AbstractMemStore {
private void pushTailToSnapshot() { private void pushTailToSnapshot() {
ImmutableSegment tail = pipeline.pullTail(); ImmutableSegment tail = pipeline.pullTail();
if (!tail.isEmpty()) { if (!tail.isEmpty()) {
setSnapshot(tail); this.snapshot = tail;
long size = getSegmentSize(tail);
setSnapshotSize(size);
} }
} }
@ -428,7 +414,7 @@ public class CompactingMemStore extends AbstractMemStore {
// debug method // debug method
public void debug() { public void debug() {
String msg = "active size="+getActive().getSize(); String msg = "active size=" + this.active.size();
msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize; msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize;
msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");

View File

@ -24,7 +24,10 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/** /**
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
@ -39,13 +42,17 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public class CompactionPipeline { public class CompactionPipeline {
private static final Log LOG = LogFactory.getLog(CompactionPipeline.class); private static final Log LOG = LogFactory.getLog(CompactionPipeline.class);
public final static long FIXED_OVERHEAD = ClassSize
.align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
public final static long ENTRY_OVERHEAD = ClassSize.LINKEDLIST_ENTRY;
private final RegionServicesForStores region; private final RegionServicesForStores region;
private LinkedList<ImmutableSegment> pipeline; private LinkedList<ImmutableSegment> pipeline;
private long version; private long version;
private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance()
.createImmutableSegment(null, .createImmutableSegment((CellComparator) null);
CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM);
public CompactionPipeline(RegionServicesForStores region) { public CompactionPipeline(RegionServicesForStores region) {
this.region = region; this.region = region;
@ -105,8 +112,8 @@ public class CompactionPipeline {
} }
if (region != null) { if (region != null) {
// update the global memstore size counter // update the global memstore size counter
long suffixSize = CompactingMemStore.getSegmentsSize(suffix); long suffixSize = getSegmentsKeySize(suffix);
long newSize = CompactingMemStore.getSegmentSize(segment); long newSize = segment.keySize();
long delta = suffixSize - newSize; long delta = suffixSize - newSize;
long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -117,6 +124,14 @@ public class CompactionPipeline {
return true; return true;
} }
private static long getSegmentsKeySize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += segment.keySize();
}
return res;
}
/** /**
* If the caller holds the current version, go over the the pipeline and try to flatten each * If the caller holds the current version, go over the the pipeline and try to flatten each
* segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based. * segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
@ -178,20 +193,20 @@ public class CompactionPipeline {
public long getMinSequenceId() { public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE; long minSequenceId = Long.MAX_VALUE;
if(!isEmpty()) { if (!isEmpty()) {
minSequenceId = pipeline.getLast().getMinSequenceId(); minSequenceId = pipeline.getLast().getMinSequenceId();
} }
return minSequenceId; return minSequenceId;
} }
public long getTailSize() { public long getTailSize() {
if(isEmpty()) return 0; if (isEmpty()) return 0;
return CompactingMemStore.getSegmentSize(pipeline.peekLast()); return pipeline.peekLast().keySize();
} }
private void swapSuffix(LinkedList<ImmutableSegment> suffix, ImmutableSegment segment) { private void swapSuffix(LinkedList<ImmutableSegment> suffix, ImmutableSegment segment) {
version++; version++;
for(Segment itemInSuffix : suffix) { for (Segment itemInSuffix : suffix) {
itemInSuffix.close(); itemInSuffix.close();
} }
pipeline.removeAll(suffix); pipeline.removeAll(suffix);

View File

@ -83,20 +83,19 @@ public class DefaultMemStore extends AbstractMemStore {
public MemStoreSnapshot snapshot() { public MemStoreSnapshot snapshot() {
// If snapshot currently has entries, then flusher failed or didn't call // If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning. // cleanup. Log a warning.
if (!getSnapshot().isEmpty()) { if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " + LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?"); "Doing nothing. Another ongoing flush or did we fail last attempt?");
} else { } else {
this.snapshotId = EnvironmentEdgeManager.currentTime(); this.snapshotId = EnvironmentEdgeManager.currentTime();
if (!getActive().isEmpty()) { if (!this.active.isEmpty()) {
ImmutableSegment immutableSegment = SegmentFactory.instance(). ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(getActive()); createImmutableSegment(this.active);
setSnapshot(immutableSegment); this.snapshot = immutableSegment;
setSnapshotSize(keySize());
resetActive(); resetActive();
} }
} }
return new MemStoreSnapshot(this.snapshotId, getSnapshot()); return new MemStoreSnapshot(this.snapshotId, this.snapshot);
} }
/** /**
@ -106,7 +105,7 @@ public class DefaultMemStore extends AbstractMemStore {
*/ */
@Override @Override
public long getFlushableSize() { public long getFlushableSize() {
long snapshotSize = getSnapshot().getSize(); long snapshotSize = getSnapshotSize();
return snapshotSize > 0 ? snapshotSize : keySize(); return snapshotSize > 0 ? snapshotSize : keySize();
} }
@ -116,8 +115,8 @@ public class DefaultMemStore extends AbstractMemStore {
*/ */
public List<KeyValueScanner> getScanners(long readPt) throws IOException { public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(2); List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(2);
list.add(getActive().getScanner(readPt, 1)); list.add(this.active.getScanner(readPt, 1));
list.add(getSnapshot().getScanner(readPt, 0)); list.add(this.snapshot.getScanner(readPt, 0));
return Collections.<KeyValueScanner> singletonList( return Collections.<KeyValueScanner> singletonList(
new MemStoreScanner(getComparator(), list)); new MemStoreScanner(getComparator(), list));
} }
@ -125,8 +124,8 @@ public class DefaultMemStore extends AbstractMemStore {
@Override @Override
protected List<Segment> getSegments() throws IOException { protected List<Segment> getSegments() throws IOException {
List<Segment> list = new ArrayList<Segment>(2); List<Segment> list = new ArrayList<Segment>(2);
list.add(getActive()); list.add(this.active);
list.add(getSnapshot()); list.add(this.snapshot);
return list; return list;
} }
@ -137,19 +136,16 @@ public class DefaultMemStore extends AbstractMemStore {
*/ */
Cell getNextRow(final Cell cell) { Cell getNextRow(final Cell cell) {
return getLowest( return getLowest(
getNextRow(cell, getActive().getCellSet()), getNextRow(cell, this.active.getCellSet()),
getNextRow(cell, getSnapshot().getCellSet())); getNextRow(cell, this.snapshot.getCellSet()));
} }
@Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) { @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
} }
/**
* @return Total memory occupied by this MemStore.
*/
@Override @Override
public long size() { public long size() {
return heapSize(); return this.active.size() + DEEP_OVERHEAD;
} }
/** /**

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -37,11 +38,12 @@ import java.io.IOException;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ImmutableSegment extends Segment { public class ImmutableSegment extends Segment {
/**
* This is an immutable segment so use the read-only TimeRange rather than the heavy-weight private static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
* TimeRangeTracker with all its synchronization when doing time range stuff. + (2 * ClassSize.REFERENCE) // Refs to timeRange and type
*/ + ClassSize.TIMERANGE;
private final TimeRange timeRange; public static final long DEEP_OVERHEAD_CSLM = DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP;
/** /**
* Types of ImmutableSegment * Types of ImmutableSegment
@ -51,6 +53,12 @@ public class ImmutableSegment extends Segment {
ARRAY_MAP_BASED, ARRAY_MAP_BASED,
} }
/**
* This is an immutable segment so use the read-only TimeRange rather than the heavy-weight
* TimeRangeTracker with all its synchronization when doing time range stuff.
*/
private final TimeRange timeRange;
private Type type = Type.SKIPLIST_MAP_BASED; private Type type = Type.SKIPLIST_MAP_BASED;
// whether it is based on CellFlatMap or ConcurrentSkipListMap // whether it is based on CellFlatMap or ConcurrentSkipListMap
@ -66,9 +74,8 @@ public class ImmutableSegment extends Segment {
*/ */
protected ImmutableSegment(Segment segment) { protected ImmutableSegment(Segment segment) {
super(segment); super(segment);
type = Type.SKIPLIST_MAP_BASED; this.type = Type.SKIPLIST_MAP_BASED;
TimeRangeTracker trt = getTimeRangeTracker(); this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
this.timeRange = trt == null? null: trt.toTimeRange();
} }
/**------------------------------------------------------------------------ /**------------------------------------------------------------------------
@ -80,20 +87,14 @@ public class ImmutableSegment extends Segment {
*/ */
protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, Type type) { MemStoreLAB memStoreLAB, int numOfCells, Type type) {
super(null, // initiailize the CellSet with NULL
super(null, // initiailize the CellSet with NULL comparator, memStoreLAB);
comparator, memStoreLAB, this.type = type;
// initial size of segment metadata (the data per cell is added in createCellArrayMapSet)
CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM,
ClassSize.CELL_ARRAY_MAP_ENTRY);
// build the true CellSet based on CellArrayMap // build the true CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator); CellSet cs = createCellArrayMapSet(numOfCells, iterator);
this.setCellSet(null, cs); // update the CellSet of the new Segment this.setCellSet(null, cs); // update the CellSet of the new Segment
this.type = type; this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
TimeRangeTracker trt = getTimeRangeTracker();
this.timeRange = trt == null? null: trt.toTimeRange();
} }
/**------------------------------------------------------------------------ /**------------------------------------------------------------------------
@ -101,15 +102,11 @@ public class ImmutableSegment extends Segment {
* list of older ImmutableSegments. * list of older ImmutableSegments.
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected ImmutableSegment( protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator,
CellComparator comparator, MemStoreCompactorIterator iterator, MemStoreLAB memStoreLAB) { MemStoreLAB memStoreLAB) {
super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
super(new CellSet(comparator), // initiailize the CellSet with empty CellSet comparator, memStoreLAB);
comparator, memStoreLAB, type = Type.SKIPLIST_MAP_BASED;
// initial size of segment metadata (the data per cell is added in internalAdd)
CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM,
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
while (iterator.hasNext()) { while (iterator.hasNext()) {
Cell c = iterator.next(); Cell c = iterator.next();
// The scanner is doing all the elimination logic // The scanner is doing all the elimination logic
@ -118,9 +115,7 @@ public class ImmutableSegment extends Segment {
boolean usedMSLAB = (newKV != c); boolean usedMSLAB = (newKV != c);
internalAdd(newKV, usedMSLAB); // internalAdd(newKV, usedMSLAB); //
} }
type = Type.SKIPLIST_MAP_BASED; this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
TimeRangeTracker trt = getTimeRangeTracker();
this.timeRange = trt == null? null: trt.toTimeRange();
} }
///////////////////// PUBLIC METHODS ///////////////////// ///////////////////// PUBLIC METHODS /////////////////////
@ -144,14 +139,16 @@ public class ImmutableSegment extends Segment {
return this.timeRange.getMin(); return this.timeRange.getMin();
} }
@Override @Override
public long keySize() { public long size() {
switch (type){ switch (this.type) {
case SKIPLIST_MAP_BASED: case SKIPLIST_MAP_BASED:
return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM; return keySize() + DEEP_OVERHEAD_CSLM;
case ARRAY_MAP_BASED: case ARRAY_MAP_BASED:
return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_CELL_ARRAY_ITEM; return keySize() + DEEP_OVERHEAD_CAM;
default: throw new IllegalStateException(); default:
throw new RuntimeException("Unknown type " + type);
} }
} }
@ -171,9 +168,6 @@ public class ImmutableSegment extends Segment {
CellSet oldCellSet = getCellSet(); CellSet oldCellSet = getCellSet();
int numOfCells = getCellsCount(); int numOfCells = getCellsCount();
// each Cell is now represented in CellArrayMap
constantCellMetaDataSize = ClassSize.CELL_ARRAY_MAP_ENTRY;
// build the new (CellSet CellArrayMap based) // build the new (CellSet CellArrayMap based)
CellSet newCellSet = recreateCellArrayMapSet(numOfCells); CellSet newCellSet = recreateCellArrayMapSet(numOfCells);
type = Type.ARRAY_MAP_BASED; type = Type.ARRAY_MAP_BASED;
@ -214,6 +208,19 @@ public class ImmutableSegment extends Segment {
return new CellSet(cam); return new CellSet(cam);
} }
protected long heapSizeChange(Cell cell, boolean succ) {
if (succ) {
switch (this.type) {
case SKIPLIST_MAP_BASED:
return ClassSize
.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
case ARRAY_MAP_BASED:
return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
}
}
return 0;
}
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet // Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet
// (without compacting iterator) // (without compacting iterator)
@ -239,5 +246,4 @@ public class ImmutableSegment extends Segment {
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false); CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
return new CellSet(cam); return new CellSet(cam);
} }
} }

View File

@ -126,7 +126,11 @@ public interface MemStore extends HeapSize {
List<KeyValueScanner> getScanners(long readPt) throws IOException; List<KeyValueScanner> getScanners(long readPt) throws IOException;
/** /**
* @return Total memory occupied by this MemStore. * @return Total memory occupied by this MemStore. This includes active segment size and heap size
* overhead of this memstore but won't include any size occupied by the snapshot. We
* assume the snapshot will get cleared soon. This is not thread safe and the memstore may
* be changed while computing its size. It is the responsibility of the caller to make
* sure this doesn't happen.
*/ */
long size(); long size();

View File

@ -22,6 +22,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -38,7 +40,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
* therefore no special synchronization is required. * therefore no special synchronization is required.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class MemStoreCompactor { public class MemStoreCompactor {
public static final long DEEP_OVERHEAD = ClassSize
.align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_DOUBLE
+ ClassSize.ATOMIC_BOOLEAN);
// Option for external guidance whether flattening is allowed // Option for external guidance whether flattening is allowed
static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten"; static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten";
@ -59,6 +65,15 @@ class MemStoreCompactor {
static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false; static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false;
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
/**
* Types of Compaction
*/
private enum Type {
COMPACT_TO_SKIPLIST_MAP,
COMPACT_TO_ARRAY_MAP
}
private CompactingMemStore compactingMemStore; private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline // a static version of the segment list from the pipeline
@ -73,13 +88,6 @@ class MemStoreCompactor {
double fraction = 0.8; double fraction = 0.8;
int immutCellsNum = 0; // number of immutable for compaction cells int immutCellsNum = 0; // number of immutable for compaction cells
/**
* Types of Compaction
*/
private enum Type {
COMPACT_TO_SKIPLIST_MAP,
COMPACT_TO_ARRAY_MAP
}
private Type type = Type.COMPACT_TO_ARRAY_MAP; private Type type = Type.COMPACT_TO_ARRAY_MAP;

View File

@ -36,7 +36,7 @@ public class MemStoreSnapshot {
public MemStoreSnapshot(long id, ImmutableSegment snapshot) { public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
this.id = id; this.id = id;
this.cellsCount = snapshot.getCellsCount(); this.cellsCount = snapshot.getCellsCount();
this.size = snapshot.getSize(); this.size = snapshot.keySize();
this.timeRangeTracker = snapshot.getTimeRangeTracker(); this.timeRangeTracker = snapshot.getTimeRangeTracker();
this.scanner = snapshot.getKeyValueScanner(); this.scanner = snapshot.getKeyValueScanner();
this.tagsPresent = snapshot.isTagsPresent(); this.tagsPresent = snapshot.isTagsPresent();

View File

@ -21,17 +21,21 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.ClassSize;
import com.google.common.annotations.VisibleForTesting;
/** /**
* A mutable segment in memstore, specifically the active segment. * A mutable segment in memstore, specifically the active segment.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class MutableSegment extends Segment { public class MutableSegment extends Segment {
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB,
long size) { public final static long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
super(cellSet, comparator, memStoreLAB, size, ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cellSet, comparator, memStoreLAB);
} }
/** /**
@ -44,29 +48,28 @@ public class MutableSegment extends Segment {
return internalAdd(cell, mslabUsed); return internalAdd(cell, mslabUsed);
} }
//methods for test
/** /**
* Returns the first cell in the segment * Returns the first cell in the segment
* @return the first cell in the segment * @return the first cell in the segment
*/ */
@VisibleForTesting
Cell first() { Cell first() {
return this.getCellSet().first(); return this.getCellSet().first();
} }
@Override @Override
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
return (getTimeRangeTracker().includesTimeRange(scan.getTimeRange()) return (this.timeRangeTracker.includesTimeRange(scan.getTimeRange())
&& (getTimeRangeTracker().getMax() >= oldestUnexpiredTS)); && (this.timeRangeTracker.getMax() >= oldestUnexpiredTS));
} }
@Override @Override
public long getMinTimestamp() { public long getMinTimestamp() {
return getTimeRangeTracker().getMin(); return this.timeRangeTracker.getMin();
} }
@Override @Override
public long keySize() { public long size() {
return size.get() - CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM; return keySize() + DEEP_OVERHEAD;
} }
} }

View File

@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -47,28 +47,31 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class Segment { public abstract class Segment {
private static final Log LOG = LogFactory.getLog(Segment.class); final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ 5 * ClassSize.REFERENCE // cellSet, comparator, memStoreLAB, size, timeRangeTracker
+ Bytes.SIZEOF_LONG // minSequenceId
+ Bytes.SIZEOF_BOOLEAN); // tagsPresent
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
+ ClassSize.CELL_SET + ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER;
private AtomicReference<CellSet> cellSet= new AtomicReference<CellSet>(); private AtomicReference<CellSet> cellSet= new AtomicReference<CellSet>();
private final CellComparator comparator; private final CellComparator comparator;
private long minSequenceId; private long minSequenceId;
private volatile MemStoreLAB memStoreLAB; private MemStoreLAB memStoreLAB;
/* The size includes everything allocated for this segment, // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not
* use keySize() to get only size of the cells */ // including the heap overhead of this class.
protected final AtomicLong size; protected final AtomicLong size;
protected final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent; protected volatile boolean tagsPresent;
private final TimeRangeTracker timeRangeTracker;
protected long constantCellMetaDataSize;
protected Segment( // This constructor is used to create empty Segments.
CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, long size, protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
long constantCellSize) {
this.cellSet.set(cellSet); this.cellSet.set(cellSet);
this.comparator = comparator; this.comparator = comparator;
this.minSequenceId = Long.MAX_VALUE; this.minSequenceId = Long.MAX_VALUE;
this.memStoreLAB = memStoreLAB; this.memStoreLAB = memStoreLAB;
this.size = new AtomicLong(size); this.size = new AtomicLong(0);
this.tagsPresent = false; this.tagsPresent = false;
this.constantCellMetaDataSize = constantCellSize;
this.timeRangeTracker = new TimeRangeTracker(); this.timeRangeTracker = new TimeRangeTracker();
} }
@ -77,9 +80,8 @@ public abstract class Segment {
this.comparator = segment.getComparator(); this.comparator = segment.getComparator();
this.minSequenceId = segment.getMinSequenceId(); this.minSequenceId = segment.getMinSequenceId();
this.memStoreLAB = segment.getMemStoreLAB(); this.memStoreLAB = segment.getMemStoreLAB();
this.size = new AtomicLong(segment.getSize()); this.size = new AtomicLong(segment.keySize());
this.tagsPresent = segment.isTagsPresent(); this.tagsPresent = segment.isTagsPresent();
this.constantCellMetaDataSize = segment.getConstantCellMetaDataSize();
this.timeRangeTracker = segment.getTimeRangeTracker(); this.timeRangeTracker = segment.getTimeRangeTracker();
} }
@ -100,7 +102,6 @@ public abstract class Segment {
} }
/** /**
* Returns whether the segment has any cells
* @return whether the segment has any cells * @return whether the segment has any cells
*/ */
public boolean isEmpty() { public boolean isEmpty() {
@ -108,7 +109,6 @@ public abstract class Segment {
} }
/** /**
* Returns number of cells in segment
* @return number of cells in segment * @return number of cells in segment
*/ */
public int getCellsCount() { public int getCellsCount() {
@ -116,7 +116,6 @@ public abstract class Segment {
} }
/** /**
* Returns the first cell in the segment that has equal or greater key than the given cell
* @return the first cell in the segment that has equal or greater key than the given cell * @return the first cell in the segment that has equal or greater key than the given cell
*/ */
public Cell getFirstAfter(Cell cell) { public Cell getFirstAfter(Cell cell) {
@ -131,9 +130,8 @@ public abstract class Segment {
* Closing a segment before it is being discarded * Closing a segment before it is being discarded
*/ */
public void close() { public void close() {
MemStoreLAB mslab = getMemStoreLAB(); if (this.memStoreLAB != null) {
if(mslab != null) { this.memStoreLAB.close();
mslab.close();
} }
// do not set MSLab to null as scanners may still be reading the data here and need to decrease // do not set MSLab to null as scanners may still be reading the data here and need to decrease
// the counter when they finish // the counter when they finish
@ -145,12 +143,12 @@ public abstract class Segment {
* @return either the given cell or its clone * @return either the given cell or its clone
*/ */
public Cell maybeCloneWithAllocator(Cell cell) { public Cell maybeCloneWithAllocator(Cell cell) {
if (getMemStoreLAB() == null) { if (this.memStoreLAB == null) {
return cell; return cell;
} }
int len = getCellLength(cell); int len = getCellLength(cell);
ByteRange alloc = getMemStoreLAB().allocateBytes(len); ByteRange alloc = this.memStoreLAB.allocateBytes(len);
if (alloc == null) { if (alloc == null) {
// The allocation was too large, allocator decided // The allocation was too large, allocator decided
// not to do anything with it. // not to do anything with it.
@ -180,27 +178,17 @@ public abstract class Segment {
} }
public void incScannerCount() { public void incScannerCount() {
if(getMemStoreLAB() != null) { if (this.memStoreLAB != null) {
getMemStoreLAB().incScannerCount(); this.memStoreLAB.incScannerCount();
} }
} }
public void decScannerCount() { public void decScannerCount() {
if(getMemStoreLAB() != null) { if (this.memStoreLAB != null) {
getMemStoreLAB().decScannerCount(); this.memStoreLAB.decScannerCount();
} }
} }
/**
* Setting the heap size of the segment - used to account for different class overheads
* @return this object
*/
public Segment setSize(long size) {
this.size.set(size);
return this;
}
/** /**
* Setting the CellSet of the segment - used only for flat immutable segment for setting * Setting the CellSet of the segment - used only for flat immutable segment for setting
* immutable CellSet after its creation in immutable segment constructor * immutable CellSet after its creation in immutable segment constructor
@ -212,22 +200,23 @@ public abstract class Segment {
return this; return this;
} }
/* return only cell's heap size */ /**
public abstract long keySize(); * @return Sum of all cell's size.
*/
public long keySize() {
return this.size.get();
}
/** /**
* Returns the heap size of the segment
* @return the heap size of the segment * @return the heap size of the segment
*/ */
public long getSize() { public abstract long size();
return size.get();
}
/** /**
* Updates the heap size counter of the segment by the given delta * Updates the heap size counter of the segment by the given delta
*/ */
public void incSize(long delta) { public void incSize(long delta) {
size.addAndGet(delta); this.size.addAndGet(delta);
} }
public long getMinSequenceId() { public long getMinSequenceId() {
@ -260,7 +249,6 @@ public abstract class Segment {
} }
/** /**
* Returns a set of all cells in the segment
* @return a set of all cells in the segment * @return a set of all cells in the segment
*/ */
protected CellSet getCellSet() { protected CellSet getCellSet() {
@ -302,6 +290,11 @@ public abstract class Segment {
return s; return s;
} }
protected long heapSizeChange(Cell cell, boolean succ) {
return succ ? ClassSize
.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell)) : 0;
}
/** /**
* Returns a subset of the segment cell set, which starts with the given cell * Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment * @param firstCell a cell in the segment
@ -312,7 +305,7 @@ public abstract class Segment {
} }
@VisibleForTesting @VisibleForTesting
public MemStoreLAB getMemStoreLAB() { MemStoreLAB getMemStoreLAB() {
return memStoreLAB; return memStoreLAB;
} }
@ -326,29 +319,13 @@ public abstract class Segment {
} }
} }
/*
* Calculate how the MemStore size has changed. Includes overhead of the
* backing Map.
* @param cell
* @param notPresent True if the cell was NOT present in the set.
* @return change in size
*/
protected long heapSizeChange(final Cell cell, final boolean notPresent){
return
notPresent ?
ClassSize.align(constantCellMetaDataSize + CellUtil.estimatedHeapSizeOf(cell)) : 0;
}
public long getConstantCellMetaDataSize() {
return this.constantCellMetaDataSize;
}
@Override @Override
public String toString() { public String toString() {
String res = "Store segment of type "+this.getClass().getName()+"; "; String res = "Store segment of type "+this.getClass().getName()+"; ";
res += "isEmpty "+(isEmpty()?"yes":"no")+"; "; res += "isEmpty "+(isEmpty()?"yes":"no")+"; ";
res += "cellCount "+getCellsCount()+"; "; res += "cellCount "+getCellsCount()+"; ";
res += "size "+getSize()+"; "; res += "cellsSize "+keySize()+"; ";
res += "heapSize "+size()+"; ";
res += "Min ts "+getMinTimestamp()+"; "; res += "Min ts "+getMinTimestamp()+"; ";
return res; return res;
} }

View File

@ -39,6 +39,7 @@ public final class SegmentFactory {
private SegmentFactory() {} private SegmentFactory() {}
private static SegmentFactory instance = new SegmentFactory(); private static SegmentFactory instance = new SegmentFactory();
public static SegmentFactory instance() { public static SegmentFactory instance() {
return instance; return instance;
} }
@ -46,47 +47,43 @@ public final class SegmentFactory {
// create skip-list-based (non-flat) immutable segment from compacting old immutable segments // create skip-list-based (non-flat) immutable segment from compacting old immutable segments
public ImmutableSegment createImmutableSegment(final Configuration conf, public ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, MemStoreCompactorIterator iterator) { final CellComparator comparator, MemStoreCompactorIterator iterator) {
MemStoreLAB memStoreLAB = getMemStoreLAB(conf); return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf));
return }
new ImmutableSegment(comparator, iterator, memStoreLAB);
// create new flat immutable segment from compacting old immutable segment
public ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, MemStoreCompactorIterator iterator, int numOfCells,
ImmutableSegment.Type segmentType) throws IOException {
Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED,
"wrong immutable segment type");
return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf), numOfCells,
segmentType);
} }
// create empty immutable segment // create empty immutable segment
public ImmutableSegment createImmutableSegment(CellComparator comparator, long size) { public ImmutableSegment createImmutableSegment(CellComparator comparator) {
MutableSegment segment = generateMutableSegment(null, comparator, null, size); MutableSegment segment = generateMutableSegment(null, comparator, null);
return createImmutableSegment(segment); return createImmutableSegment(segment);
} }
// create immutable segment from mutable // create immutable segment from mutable segment
public ImmutableSegment createImmutableSegment(MutableSegment segment) { public ImmutableSegment createImmutableSegment(MutableSegment segment) {
return new ImmutableSegment(segment); return new ImmutableSegment(segment);
} }
// create mutable segment // create mutable segment
public MutableSegment createMutableSegment(final Configuration conf, public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) {
CellComparator comparator, long size) {
MemStoreLAB memStoreLAB = getMemStoreLAB(conf); MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
return generateMutableSegment(conf, comparator, memStoreLAB, size); return generateMutableSegment(conf, comparator, memStoreLAB);
}
// create new flat immutable segment from compacting old immutable segment
public ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType)
throws IOException {
Preconditions.checkArgument(
segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, "wrong immutable segment type");
MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
return
new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType);
} }
//****** private methods to instantiate concrete store segments **********// //****** private methods to instantiate concrete store segments **********//
private MutableSegment generateMutableSegment( private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB, long size) { MemStoreLAB memStoreLAB) {
// TBD use configuration to set type of segment // TBD use configuration to set type of segment
CellSet set = new CellSet(comparator); CellSet set = new CellSet(comparator);
return new MutableSegment(set, comparator, memStoreLAB, size); return new MutableSegment(set, comparator, memStoreLAB);
} }
private MemStoreLAB getMemStoreLAB(Configuration conf) { private MemStoreLAB getMemStoreLAB(Configuration conf) {

View File

@ -40,6 +40,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -49,6 +50,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -310,19 +312,107 @@ public class TestHeapSize {
// DefaultMemStore Deep Overhead // DefaultMemStore Deep Overhead
actual = DefaultMemStore.DEEP_OVERHEAD; actual = DefaultMemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false); expected = ClassSize.estimateBase(cl, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
}
// CompactingMemStore Deep Overhead
cl = CompactingMemStore.class;
actual = CompactingMemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
expected += ClassSize.estimateBase(CompactionPipeline.class, false);
expected += ClassSize.estimateBase(LinkedList.class, false);
expected += ClassSize.estimateBase(MemStoreCompactor.class, false);
expected += ClassSize.estimateBase(AtomicBoolean.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicBoolean.class, true);
ClassSize.estimateBase(AtomicBoolean.class, true);
ClassSize.estimateBase(CompactionPipeline.class, true);
ClassSize.estimateBase(LinkedList.class, true);
ClassSize.estimateBase(MemStoreCompactor.class, true);
ClassSize.estimateBase(AtomicBoolean.class, true);
assertEquals(expected, actual);
}
// Segment Deep overhead
cl = Segment.class;
actual = Segment.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false); expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false); expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
if(expected != actual) { if (expected != actual) {
ClassSize.estimateBase(cl, true); ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true); ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true);
assertEquals(expected, actual); assertEquals(expected, actual);
} }
// MutableSegment Deep overhead
cl = MutableSegment.class;
actual = MutableSegment.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
assertEquals(expected, actual);
}
// ImmutableSegment Deep overhead
cl = ImmutableSegment.class;
actual = ImmutableSegment.DEEP_OVERHEAD_CSLM;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
assertEquals(expected, actual);
}
actual = ImmutableSegment.DEEP_OVERHEAD_CAM;
expected = ClassSize.estimateBase(cl, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
expected += ClassSize.estimateBase(CellArrayMap.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
ClassSize.estimateBase(CellArrayMap.class, true);
assertEquals(expected, actual);
}
// Store Overhead // Store Overhead
cl = HStore.class; cl = HStore.class;
actual = HStore.FIXED_OVERHEAD; actual = HStore.FIXED_OVERHEAD;

View File

@ -377,7 +377,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
private long runSnapshot(final AbstractMemStore hmc, boolean useForce) private long runSnapshot(final AbstractMemStore hmc, boolean useForce)
throws IOException { throws IOException {
// Save off old state. // Save off old state.
long oldHistorySize = hmc.getSnapshot().getSize(); long oldHistorySize = hmc.getSnapshot().keySize();
long prevTimeStamp = hmc.timeOfOldestEdit(); long prevTimeStamp = hmc.timeOfOldestEdit();
hmc.snapshot(); hmc.snapshot();
@ -547,10 +547,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf1, 3, val)); memstore.add(new KeyValue(row, fam, qf1, 3, val));
assertEquals(3, memstore.getActive().getCellsCount()); assertEquals(3, memstore.getActive().getCellsCount());
while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertTrue(chunkPool.getPoolSize() == 0); assertTrue(chunkPool.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners; // Chunks will be put back to pool after close scanners;
@ -597,9 +593,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
long size = memstore.getFlushableSize(); long size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
@ -625,9 +618,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
long size = memstore.getFlushableSize(); long size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(1000);
}
int counter = 0; int counter = 0;
for ( Segment s : memstore.getSegments()) { for ( Segment s : memstore.getSegments()) {
counter += s.getCellsCount(); counter += s.getCellsCount();
@ -641,9 +631,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
size = memstore.getFlushableSize(); size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize()); assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize());
@ -672,9 +659,6 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
String tstStr = "\n\nFlushable size after first flush in memory:" + size String tstStr = "\n\nFlushable size after first flush in memory:" + size
+ ". Is MemmStore in compaction?:" + ((CompactingMemStore)memstore).isMemStoreFlushingInMemory(); + ". Is MemmStore in compaction?:" + ((CompactingMemStore)memstore).isMemStoreFlushingInMemory();
while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
assertEquals(0, memstore.getSnapshot().getCellsCount()); assertEquals(0, memstore.getSnapshot().getCellsCount());
assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize()); assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
@ -719,7 +703,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) { private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier"); byte[] qf = Bytes.toBytes("testqualifier");
long size = hmc.getActive().getSize(); long size = hmc.getActive().keySize();
for (int i = 0; i < keys.length; i++) { for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts Threads.sleep(1); // to make sure each kv gets a different ts
@ -729,7 +713,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
hmc.add(kv); hmc.add(kv);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp()); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
} }
regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().getSize() - size); regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().keySize() - size);
} }
private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {

View File

@ -333,7 +333,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) { private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily"); byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier"); byte[] qf = Bytes.toBytes("testqualifier");
long size = hmc.getActive().getSize();// long size = hmc.getActive().size();//
for (int i = 0; i < keys.length; i++) { for (int i = 0; i < keys.length; i++) {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
Threads.sleep(1); // to make sure each kv gets a different ts Threads.sleep(1); // to make sure each kv gets a different ts
@ -343,7 +343,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
hmc.add(kv); hmc.add(kv);
LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp()); LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
} }
regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().getSize() - size);// regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().size() - size);//
} }
private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {

View File

@ -172,8 +172,9 @@ public class TestPerColumnFamilyFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize assertEquals(
+ cf2MemstoreSize + cf3MemstoreSize); totalMemstoreSize + (3 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize);
// Flush! // Flush!
region.flush(false); region.flush(false);
@ -192,7 +193,7 @@ public class TestPerColumnFamilyFlush {
// We should have cleared out only CF1, since we chose the flush thresholds // We should have cleared out only CF1, since we chose the flush thresholds
// and number of puts accordingly. // and number of puts accordingly.
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
// Nothing should have happened to CF2, ... // Nothing should have happened to CF2, ...
assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
// ... or CF3 // ... or CF3
@ -201,8 +202,9 @@ public class TestPerColumnFamilyFlush {
// LSN in the memstore of CF2. // LSN in the memstore of CF2.
assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
// Of course, this should hold too. // Of course, this should hold too.
assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize assertEquals(
+ cf3MemstoreSize); totalMemstoreSize + (2 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
cf2MemstoreSize + cf3MemstoreSize);
// Now add more puts (mostly for CF2), so that we only flush CF2 this time. // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
for (int i = 1200; i < 2400; i++) { for (int i = 1200; i < 2400; i++) {
@ -229,11 +231,12 @@ public class TestPerColumnFamilyFlush {
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// CF1 and CF2, both should be absent. // CF1 and CF2, both should be absent.
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize);
// CF3 shouldn't have been touched. // CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize); assertEquals(totalMemstoreSize + (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
cf3MemstoreSize);
assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3); assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
// What happens when we hit the memstore limit, but we are not able to find // What happens when we hit the memstore limit, but we are not able to find
@ -296,8 +299,9 @@ public class TestPerColumnFamilyFlush {
// The total memstore size should be the same as the sum of the sizes of // The total memstore size should be the same as the sum of the sizes of
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize assertEquals(
+ cf2MemstoreSize + cf3MemstoreSize); totalMemstoreSize + (3 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize);
// Flush! // Flush!
region.flush(false); region.flush(false);
@ -310,9 +314,9 @@ public class TestPerColumnFamilyFlush {
region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
// Everything should have been cleared // Everything should have been cleared
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf2MemstoreSize);
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf3MemstoreSize);
assertEquals(0, totalMemstoreSize); assertEquals(0, totalMemstoreSize);
assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
@ -379,12 +383,13 @@ public class TestPerColumnFamilyFlush {
cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
// CF1 Should have been flushed // CF1 Should have been flushed
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSize);
// CF2 and CF3 shouldn't have been flushed. // CF2 and CF3 shouldn't have been flushed.
assertTrue(cf2MemstoreSize > 0); assertTrue(cf2MemstoreSize > 0);
assertTrue(cf3MemstoreSize > 0); assertTrue(cf3MemstoreSize > 0);
assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize assertEquals(
+ cf3MemstoreSize); totalMemstoreSize + (2 * (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)),
cf2MemstoreSize + cf3MemstoreSize);
// Wait for the RS report to go across to the master, so that the master // Wait for the RS report to go across to the master, so that the master
// is aware of which sequence ids have been flushed, before we kill the RS. // is aware of which sequence ids have been flushed, before we kill the RS.
@ -521,12 +526,12 @@ public class TestPerColumnFamilyFlush {
}); });
LOG.info("Finished waiting on flush after too many WALs..."); LOG.info("Finished waiting on flush after too many WALs...");
// Individual families should have been flushed. // Individual families should have been flushed.
assertEquals(DefaultMemStore.DEEP_OVERHEAD, assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
desiredRegion.getStore(FAMILY1).getMemStoreSize()); desiredRegion.getStore(FAMILY1).getMemStoreSize());
assertEquals(DefaultMemStore.DEEP_OVERHEAD, assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
desiredRegion.getStore(FAMILY2).getMemStoreSize()); desiredRegion.getStore(FAMILY2).getMemStoreSize());
assertEquals(DefaultMemStore.DEEP_OVERHEAD, assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
desiredRegion.getStore(FAMILY3).getMemStoreSize()); desiredRegion.getStore(FAMILY3).getMemStoreSize());
// let WAL cleanOldLogs // let WAL cleanOldLogs
assertNull(getWAL(desiredRegion).rollWriter(true)); assertNull(getWAL(desiredRegion).rollWriter(true));
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);

View File

@ -19,10 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
@ -31,19 +28,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -59,7 +51,6 @@ import static org.junit.Assert.assertTrue;
@Category({ RegionServerTests.class, LargeTests.class }) @Category({ RegionServerTests.class, LargeTests.class })
public class TestWalAndCompactingMemStoreFlush { public class TestWalAndCompactingMemStoreFlush {
private static final Log LOG = LogFactory.getLog(TestWalAndCompactingMemStoreFlush.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush", public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush",
@ -201,12 +192,13 @@ public class TestWalAndCompactingMemStoreFlush {
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize + String msg = "totalMemstoreSize="+totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
" DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM " CompactingMemStore.DEEP_OVERHEAD="+CompactingMemStore.DEEP_OVERHEAD +
+
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, assertEquals(msg,
totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
+ (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush!!!!!!!!!!!!!!!!!!!!!! // Flush!!!!!!!!!!!!!!!!!!!!!!
@ -220,11 +212,6 @@ public class TestWalAndCompactingMemStoreFlush {
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
region.flush(false); region.flush(false);
// CF3 should be compacted so wait here to be sure the compaction is done
while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
.isMemStoreFlushingInMemory())
Threads.sleep(10);
// Recalculate everything // Recalculate everything
long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
@ -239,8 +226,6 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD
+ ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD
+ ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" + CompactingMemStore
.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+ "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
+ "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
+ ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
@ -249,12 +234,13 @@ public class TestWalAndCompactingMemStoreFlush {
assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI); assertTrue(cf1MemstoreSizePhaseII < cf1MemstoreSizePhaseI);
// CF2 should become empty // CF2 should become empty
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseII);
// verify that CF3 was flushed to memory and was compacted (this is approximation check) // verify that CF3 was flushed to memory and was compacted (this is approximation check)
assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD + assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD
CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM > + ImmutableSegment.DEEP_OVERHEAD_CAM
cf3MemstoreSizePhaseII); + CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII);
// CF3 was compacted and flattened! // CF3 was compacted and flattened!
assertTrue("\n<<< Size of CF3 in phase I - " + cf3MemstoreSizePhaseI assertTrue("\n<<< Size of CF3 in phase I - " + cf3MemstoreSizePhaseI
@ -315,7 +301,8 @@ public class TestWalAndCompactingMemStoreFlush {
// CF1's pipeline component (inserted before first flush) should be flushed to disk // CF1's pipeline component (inserted before first flush) should be flushed to disk
// CF2 should be flushed to disk // CF2 should be flushed to disk
assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV); assertTrue(cf1MemstoreSizePhaseIII > cf1MemstoreSizePhaseIV);
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseIV);
// CF3 shouldn't have been touched. // CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
@ -340,12 +327,16 @@ public class TestWalAndCompactingMemStoreFlush {
long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertTrue(DefaultMemStore.DEEP_OVERHEAD < cf1MemstoreSizePhaseV); assertTrue(
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD < cf1MemstoreSizePhaseV);
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseV);
assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf3MemstoreSizePhaseV);
region.flush(true); // flush once again in order to be sure that everything is empty region.flush(true); // flush once again in order to be sure that everything is empty
assertEquals(DefaultMemStore.DEEP_OVERHEAD, region.getStore(FAMILY1).getMemStoreSize()); assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
region.getStore(FAMILY1).getMemStoreSize());
// What happens when we hit the memstore limit, but we are not able to find // What happens when we hit the memstore limit, but we are not able to find
// any Column Family above the threshold? // any Column Family above the threshold?
@ -453,12 +444,12 @@ public class TestWalAndCompactingMemStoreFlush {
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize + String msg = "totalMemstoreSize="+totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
" DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
assertEquals(msg,totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, assertEquals(msg,
totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
+ (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush!!!!!!!!!!!!!!!!!!!!!! // Flush!!!!!!!!!!!!!!!!!!!!!!
@ -472,11 +463,6 @@ public class TestWalAndCompactingMemStoreFlush {
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
region.flush(false); region.flush(false);
// CF3 should be compacted so wait here to be sure the compaction is done
while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
.isMemStoreFlushingInMemory())
Threads.sleep(10);
// Recalculate everything // Recalculate everything
long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize(); long cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
@ -491,24 +477,23 @@ public class TestWalAndCompactingMemStoreFlush {
s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD s = s + "DefaultMemStore DEEP_OVERHEAD is:" + DefaultMemStore.DEEP_OVERHEAD
+ ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD + ", CompactingMemStore DEEP_OVERHEAD is:" + CompactingMemStore.DEEP_OVERHEAD
+ ", CompactingMemStore DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM is:" + CompactingMemStore
.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+ "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n" + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
+ "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
+ ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n"; + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
// CF1 was flushed to memory, but there is nothing to compact, should // CF1 was flushed to memory, but there is nothing to compact, should
// remain the same size plus renewed empty skip-list // remain the same size plus renewed empty skip-list
assertEquals(s, cf1MemstoreSizePhaseII, assertEquals(s, cf1MemstoreSizePhaseII, cf1MemstoreSizePhaseI
cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM); + ImmutableSegment.DEEP_OVERHEAD_CAM + CompactionPipeline.ENTRY_OVERHEAD);
// CF2 should become empty // CF2 should become empty
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseII);
// verify that CF3 was flushed to memory and was compacted (this is approximation check) // verify that CF3 was flushed to memory and was compacted (this is approximation check)
assertTrue(cf3MemstoreSizePhaseI/2+DefaultMemStore.DEEP_OVERHEAD + assertTrue(cf3MemstoreSizePhaseI / 2 + CompactingMemStore.DEEP_OVERHEAD
CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM > + ImmutableSegment.DEEP_OVERHEAD_CAM
cf3MemstoreSizePhaseII); + CompactionPipeline.ENTRY_OVERHEAD > cf3MemstoreSizePhaseII);
assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII); assertTrue(cf3MemstoreSizePhaseI/2 < cf3MemstoreSizePhaseII);
@ -564,9 +549,10 @@ public class TestWalAndCompactingMemStoreFlush {
// CF1's pipeline component (inserted before first flush) should be flushed to disk // CF1's pipeline component (inserted before first flush) should be flushed to disk
// CF2 should be flushed to disk // CF2 should be flushed to disk
assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + DefaultMemStore.DEEP_OVERHEAD, assertEquals(cf1MemstoreSizePhaseIII - cf1MemstoreSizePhaseI + CompactingMemStore.DEEP_OVERHEAD
cf1MemstoreSizePhaseIV); + MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseIV);
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseIV);
// CF3 shouldn't have been touched. // CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII); assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
@ -590,9 +576,12 @@ public class TestWalAndCompactingMemStoreFlush {
long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region) long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
.getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSizePhaseV); assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseV); cf1MemstoreSizePhaseV);
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSizePhaseV); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseV);
assertEquals(CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf3MemstoreSizePhaseV);
// Because there is nothing in any memstore the WAL's LSN should be -1 // Because there is nothing in any memstore the WAL's LSN should be -1
assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM); assertEquals(smallestSeqInRegionCurrentMemstorePhaseV, HConstants.NO_SEQNUM);
@ -672,24 +661,17 @@ public class TestWalAndCompactingMemStoreFlush {
// memstores of CF1, CF2 and CF3. // memstores of CF1, CF2 and CF3.
String msg = "totalMemstoreSize="+totalMemstoreSize + String msg = "totalMemstoreSize="+totalMemstoreSize +
" DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD + " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
" DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM="+CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_SKIPLIST_ITEM
+
" cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI + " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
" cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI + " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
" cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ; " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
assertEquals(msg, totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, assertEquals(msg,
totalMemstoreSize + 2 * (CompactingMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD)
+ (DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD),
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI); cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush! // Flush!
((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory();
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
// CF1 and CF3 should be compacted so wait here to be sure the compaction is done
while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore())
.isMemStoreFlushingInMemory())
Threads.sleep(10);
while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
.isMemStoreFlushingInMemory())
Threads.sleep(10);
region.flush(false); region.flush(false);
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize(); long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
@ -701,7 +683,8 @@ public class TestWalAndCompactingMemStoreFlush {
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3); long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
// CF2 should have been cleared // CF2 should have been cleared
assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSizePhaseII); assertEquals(DefaultMemStore.DEEP_OVERHEAD + MutableSegment.DEEP_OVERHEAD,
cf2MemstoreSizePhaseII);
String s = "\n\n----------------------------------\n" String s = "\n\n----------------------------------\n"
+ "Upon initial insert and flush, LSN of CF1 is:" + "Upon initial insert and flush, LSN of CF1 is:"
@ -739,13 +722,6 @@ public class TestWalAndCompactingMemStoreFlush {
// Flush! // Flush!
((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory();
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory(); ((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
// CF1 and CF3 should be compacted so wait here to be sure the compaction is done
while (((CompactingMemStore) region.getStore(FAMILY1).getMemStore())
.isMemStoreFlushingInMemory())
Threads.sleep(10);
while (((CompactingMemStore) region.getStore(FAMILY3).getMemStore())
.isMemStoreFlushingInMemory())
Threads.sleep(10);
region.flush(false); region.flush(false);
long smallestSeqInRegionCurrentMemstorePhaseIV = long smallestSeqInRegionCurrentMemstorePhaseIV =
@ -768,24 +744,7 @@ public class TestWalAndCompactingMemStoreFlush {
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
// Find the (first) region which has the specified name.
private static Pair<Region, HRegionServer> getRegionWithName(TableName tableName) {
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
HRegionServer hrs = rsts.get(i).getRegionServer();
for (Region region : hrs.getOnlineRegions(tableName)) {
return Pair.newPair(region, hrs);
}
}
return null;
}
private WAL getWAL(Region region) { private WAL getWAL(Region region) {
return ((HRegion)region).getWAL(); return ((HRegion)region).getWAL();
} }
private int getNumRolledLogFiles(Region region) {
return ((FSHLog)getWAL(region)).getNumRolledLogFiles();
}
} }