HBASE-18966 Use non-sync TimeRangeTracker as a replacement for TimeRange in ImmutableSegment

This commit is contained in:
Chia-Ping Tsai 2017-10-14 23:09:58 +08:00
parent 104595137e
commit a63d79837d
9 changed files with 133 additions and 77 deletions

View File

@ -132,6 +132,9 @@ public class ClassSize {
/** Overhead for SyncTimeRangeTracker */
public static final int SYNC_TIMERANGE_TRACKER;
/** Overhead for NonSyncTimeRangeTracker */
public static final int NON_SYNC_TIMERANGE_TRACKER;
/** Overhead for CellSkipListSet */
public static final int CELL_SET;
@ -327,6 +330,8 @@ public class ClassSize {
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
NON_SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG);
CELL_SET = align(OBJECT + REFERENCE);
STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);

View File

@ -42,17 +42,11 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
public class CompositeImmutableSegment extends ImmutableSegment {
private final List<ImmutableSegment> segments;
// CompositeImmutableSegment is used for snapshots and snapshot should
// support getTimeRangeTracker() interface.
// Thus we hold a constant TRT build in the construction time from TRT of the given segments.
private final TimeRangeTracker timeRangeTracker;
private long keySize = 0;
public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
super(comparator);
this.segments = segments;
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
for (ImmutableSegment s : segments) {
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());
@ -127,11 +121,6 @@ public class CompositeImmutableSegment extends ImmutableSegment {
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
@Override
public long getMinTimestamp(){
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
* Creates the scanner for the given read point
* @return a scanner for the given read point

View File

@ -35,15 +35,7 @@ import java.util.List;
@InterfaceAudience.Private
public abstract class ImmutableSegment extends Segment {
public static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+ ClassSize.align(ClassSize.REFERENCE // Referent to timeRange
+ ClassSize.TIMERANGE);
/**
* This is an immutable segment so use the read-only TimeRange rather than the heavy-weight
* TimeRangeTracker with all its synchronization when doing time range stuff.
*/
private final TimeRange timeRange;
public static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD + ClassSize.NON_SYNC_TIMERANGE_TRACKER;
// each sub-type of immutable segment knows whether it is flat or not
protected abstract boolean canBeFlattened();
@ -53,16 +45,14 @@ public abstract class ImmutableSegment extends Segment {
* Empty C-tor to be used only for CompositeImmutableSegment
*/
protected ImmutableSegment(CellComparator comparator) {
super(comparator);
this.timeRange = null;
super(comparator, TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC));
}
/**------------------------------------------------------------------------
* C-tor to be used to build the derived classes
*/
protected ImmutableSegment(CellSet cs, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cs, comparator, memStoreLAB);
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
super(cs, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC));
}
/**------------------------------------------------------------------------
@ -72,21 +62,10 @@ public abstract class ImmutableSegment extends Segment {
*/
protected ImmutableSegment(Segment segment) {
super(segment);
this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
}
///////////////////// PUBLIC METHODS /////////////////////
@Override
public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
return this.timeRange.includesTimeRange(tr) &&
this.timeRange.getMax() >= oldestUnexpiredTS;
}
@Override
public long getMinTimestamp() {
return this.timeRange.getMin();
}
public int getNumOfSegments() {
return 1;

View File

@ -38,10 +38,12 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
@InterfaceAudience.Private
public class MutableSegment extends Segment {
public final static long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
public final static long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+ ClassSize.CONCURRENT_SKIPLISTMAP
+ ClassSize.SYNC_TIMERANGE_TRACKER;
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
super(cellSet, comparator, memStoreLAB);
super(cellSet, comparator, memStoreLAB, TimeRangeTracker.create(TimeRangeTracker.Type.SYNC));
incSize(0,DEEP_OVERHEAD); // update the mutable segment metadata
}
@ -112,17 +114,6 @@ public class MutableSegment extends Segment {
return this.getCellSet().first();
}
@Override
public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
return (this.timeRangeTracker.includesTimeRange(tr)
&& (this.timeRangeTracker.getMax() >= oldestUnexpiredTS));
}
@Override
public long getMinTimestamp() {
return this.timeRangeTracker.getMin();
}
@Override protected long indexEntrySize() {
return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY;
}

View File

@ -54,7 +54,7 @@ public abstract class Segment {
+ Bytes.SIZEOF_LONG // minSequenceId
+ Bytes.SIZEOF_BOOLEAN); // tagsPresent
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
+ ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG + ClassSize.SYNC_TIMERANGE_TRACKER;
+ ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG;
private AtomicReference<CellSet> cellSet= new AtomicReference<>();
private final CellComparator comparator;
@ -69,15 +69,15 @@ public abstract class Segment {
// Empty constructor to be used when Segment is used as interface,
// and there is no need in true Segments state
protected Segment(CellComparator comparator) {
protected Segment(CellComparator comparator, TimeRangeTracker trt) {
this.comparator = comparator;
this.dataSize = new AtomicLong(0);
this.heapSize = new AtomicLong(0);
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
this.timeRangeTracker = trt;
}
// This constructor is used to create empty Segments.
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, TimeRangeTracker trt) {
this.cellSet.set(cellSet);
this.comparator = comparator;
this.minSequenceId = Long.MAX_VALUE;
@ -85,7 +85,7 @@ public abstract class Segment {
this.dataSize = new AtomicLong(0);
this.heapSize = new AtomicLong(0);
this.tagsPresent = false;
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
this.timeRangeTracker = trt;
}
protected Segment(Segment segment) {
@ -177,9 +177,11 @@ public abstract class Segment {
return KeyValueUtil.length(cell);
}
public abstract boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS);
public abstract long getMinTimestamp();
public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
return !isEmpty()
&& (tr.isAllTime() || timeRangeTracker.includesTimeRange(tr))
&& timeRangeTracker.getMax() >= oldestUnexpiredTS;
}
public boolean isTagsPresent() {
return tagsPresent;
@ -354,7 +356,8 @@ public abstract class Segment {
res += "cellsCount "+getCellsCount()+"; ";
res += "cellsSize "+keySize()+"; ";
res += "totalHeapSize "+heapSize()+"; ";
res += "Min ts "+getMinTimestamp()+"; ";
res += "Min ts " + timeRangeTracker.getMin() + "; ";
res += "Max ts " + timeRangeTracker.getMax() + "; ";
return res;
}
}

View File

@ -229,7 +229,9 @@ public abstract class TimeRangeTracker implements Writable {
return new TimeRange(min, max);
}
private static class NonSyncTimeRangeTracker extends TimeRangeTracker {
@VisibleForTesting
//In order to estimate the heap size, this inner class need to be accessible to TestHeapSize.
public static class NonSyncTimeRangeTracker extends TimeRangeTracker {
private long minimumTimestamp = INITIAL_MIN_TIMESTAMP;
private long maximumTimestamp = INITIAL_MAX_TIMESTAMP;

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.ImmutableSegment;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactor;
import org.apache.hadoop.hbase.regionserver.MutableSegment;
import org.apache.hadoop.hbase.regionserver.Segment;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -249,7 +250,7 @@ public class TestHeapSize {
assertEquals(expected, actual);
}
// TimeRangeTracker
// SyncTimeRangeTracker
cl = SyncTimeRangeTracker.class;
expected = ClassSize.estimateBase(cl, false);
actual = ClassSize.SYNC_TIMERANGE_TRACKER;
@ -258,6 +259,15 @@ public class TestHeapSize {
assertEquals(expected, actual);
}
// NonSyncTimeRangeTracker
cl = NonSyncTimeRangeTracker.class;
expected = ClassSize.estimateBase(cl, false);
actual = ClassSize.NON_SYNC_TIMERANGE_TRACKER;
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
}
// CellSet
cl = CellSet.class;
expected = ClassSize.estimateBase(cl, false);
@ -361,13 +371,11 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
assertEquals(expected, actual);
}
@ -398,16 +406,14 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
assertEquals(expected, actual);
}
@ -417,8 +423,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
@ -426,8 +431,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
assertEquals(expected, actual);
}
@ -437,8 +441,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
expected += ClassSize.estimateBase(NonSyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(CellArrayMap.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
@ -446,8 +449,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
ClassSize.estimateBase(NonSyncTimeRangeTracker.class, true);
ClassSize.estimateBase(CellArrayMap.class, true);
assertEquals(expected, actual);
}

View File

@ -350,6 +350,89 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
memstore.clearSnapshot(snapshot.getId());
}
@Test
public void testTimeRangeAfterCompaction() throws IOException {
if (toCellChunkMap) {
// set memstore to flat into CellChunkMap
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
}
testTimeRange(true);
}
@Test
public void testTimeRangeAfterMerge() throws IOException {
if (toCellChunkMap) {
// set memstore to flat into CellChunkMap
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();
}
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
((CompactingMemStore)memstore).initiateType(compactionType);
testTimeRange(false);
}
private void testTimeRange(boolean isCompaction) throws IOException {
final long initTs = 100;
long currentTs = initTs;
byte[] row = Bytes.toBytes("row");
byte[] family = Bytes.toBytes("family");
byte[] qf1 = Bytes.toBytes("qf1");
// first segment in pipeline
this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
long minTs = currentTs;
this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
long numberOfCell = 2;
assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
assertEquals(minTs, memstore.getSegments().stream().mapToLong(
m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
((CompactingMemStore) memstore).flushInMemory();
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
if (isCompaction) {
// max version = 1, so one cell will be dropped.
numberOfCell = 1;
minTs = currentTs;
}
// second segment in pipeline
this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
numberOfCell += 2;
assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
assertEquals(minTs, memstore.getSegments().stream().mapToLong(
m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
((CompactingMemStore) memstore).flushInMemory(); // trigger the merge
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
Threads.sleep(10);
}
if (isCompaction) {
// max version = 1, so one cell will be dropped.
numberOfCell = 1;
minTs = currentTs;
}
assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
assertEquals(minTs, memstore.getSegments().stream().mapToLong(
m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
}
@Test
public void testCountOfCellsAfterFlatteningByScan() throws IOException {
String[] keys1 = { "A", "B", "C" }; // A, B, C

View File

@ -1083,19 +1083,21 @@ public class TestHStore {
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
storeFlushCtx.prepare();
inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
int numberOfMemScannersWhenScaning = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
// snaptshot + active (if it isn't empty)
assertEquals(numberOfMemScannersWhenScaning, countMemStoreScanner(s));
// snapshot + active (if inputCellsAfterSnapshot isn't empty)
assertEquals(numberOfMemScannersBeforeFlush, countMemStoreScanner(s));
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
// snapshot has no data after flush
int numberOfMemScannersAfterFlush = inputCellsAfterSnapshot.isEmpty() ? 0 : 1;
boolean more;
int cellCount = 0;
do {
List<Cell> cells = new ArrayList<>();
more = s.next(cells);
cellCount += cells.size();
assertEquals(more ? numberOfMemScannersWhenScaning : 0, countMemStoreScanner(s));
assertEquals(more ? numberOfMemScannersAfterFlush : 0, countMemStoreScanner(s));
} while (more);
assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
+ ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),