HBASE-18753 Introduce the unsynchronized TimeRangeTracker

This commit is contained in:
Chia-Ping Tsai 2017-10-01 16:59:35 +08:00
parent dd3d7de018
commit 9f1bfbeaab
11 changed files with 380 additions and 222 deletions

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.concurrent.ConcurrentHashMap;
@ -30,6 +29,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* Class for determining the "size" of a class, an attempt to calculate the
@ -128,8 +129,8 @@ public class ClassSize {
/** Overhead for timerange */
public static final int TIMERANGE;
/** Overhead for TimeRangeTracker */
public static final int TIMERANGE_TRACKER;
/** Overhead for SyncTimeRangeTracker */
public static final int SYNC_TIMERANGE_TRACKER;
/** Overhead for CellSkipListSet */
public static final int CELL_SET;
@ -325,7 +326,7 @@ public class ClassSize {
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
CELL_SET = align(OBJECT + REFERENCE);
STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);

View File

@ -399,7 +399,7 @@ public class TestHFileOutputFormat2 {
assertNotNull(range);
// unmarshall and check values.
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
Writables.copyWritable(range, timeRangeTracker);
LOG.info(timeRangeTracker.getMin() +
"...." + timeRangeTracker.getMax());

View File

@ -18,19 +18,20 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* The CompositeImmutableSegments is created as a collection of ImmutableSegments and supports
* the interface of a single ImmutableSegments.
@ -51,7 +52,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
public CompositeImmutableSegment(CellComparator comparator, List<ImmutableSegment> segments) {
super(comparator);
this.segments = segments;
this.timeRangeTracker = new TimeRangeTracker();
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
for (ImmutableSegment s : segments) {
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMax());
this.timeRangeTracker.includeTimestamp(s.getTimeRangeTracker().getMin());

View File

@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -54,7 +54,7 @@ public abstract class Segment {
+ Bytes.SIZEOF_LONG // minSequenceId
+ Bytes.SIZEOF_BOOLEAN); // tagsPresent
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.ATOMIC_REFERENCE
+ ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER;
+ ClassSize.CELL_SET + 2 * ClassSize.ATOMIC_LONG + ClassSize.SYNC_TIMERANGE_TRACKER;
private AtomicReference<CellSet> cellSet= new AtomicReference<>();
private final CellComparator comparator;
@ -73,7 +73,7 @@ public abstract class Segment {
this.comparator = comparator;
this.dataSize = new AtomicLong(0);
this.heapSize = new AtomicLong(0);
this.timeRangeTracker = new TimeRangeTracker();
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
}
// This constructor is used to create empty Segments.
@ -85,7 +85,7 @@ public abstract class Segment {
this.dataSize = new AtomicLong(0);
this.heapSize = new AtomicLong(0);
this.tagsPresent = false;
this.timeRangeTracker = new TimeRangeTracker();
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
}
protected Segment(Segment segment) {

View File

@ -131,7 +131,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
// TODO: put the state of the TRT on the TRT; i.e. make a read-only version (TimeRange) when
// it no longer writable.
this.timeRangeTrackerSet = trt != null;
this.timeRangeTracker = this.timeRangeTrackerSet? trt: new TimeRangeTracker();
this.timeRangeTracker = this.timeRangeTrackerSet? trt: TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
// TODO : Change all writers to be specifically created for compaction context
writer = HFile.getWriterFactory(conf, cacheConf)
.withPath(fs, path)

View File

@ -25,17 +25,17 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* Stores minimum and maximum timestamp values, it is [minimumTimestamp, maximumTimestamp] in
* interval notation.
* Use this class at write-time ONLY. Too much synchronization to use at read time
* (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore
* updates but then later we can make one as part of a compaction when there is only one thread
* involved -- consider making different version, the synchronized and the unsynchronized).
* Use {@link TimeRange} at read time instead of this. See toTimeRange() to make TimeRange to use.
* MemStores use this class to track minimum and maximum timestamps. The TimeRangeTracker made by
* the MemStore is passed to the StoreFile for it to write out as part a flush in the the file
@ -44,33 +44,55 @@ import org.apache.hadoop.io.Writable;
* at read time via an instance of {@link TimeRange} to test if Cells fit the StoreFile TimeRange.
*/
@InterfaceAudience.Private
public class TimeRangeTracker implements Writable {
public abstract class TimeRangeTracker implements Writable {
public enum Type {
// thread-unsafe
NON_SYNC,
// thread-safe
SYNC
}
static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE;
static final long INITIAL_MAX_TIMESTAMP = -1L;
AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP);
AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP);
/**
* Default constructor.
* Initializes TimeRange to be null
*/
public TimeRangeTracker() {}
/**
* Copy Constructor
* @param trt source TimeRangeTracker
*/
public TimeRangeTracker(final TimeRangeTracker trt) {
minimumTimestamp.set(trt.getMin());
maximumTimestamp.set(trt.getMax());
public static TimeRangeTracker create(Type type) {
switch (type) {
case NON_SYNC:
return new NonSyncTimeRangeTracker();
case SYNC:
return new SyncTimeRangeTracker();
default:
throw new UnsupportedOperationException("The type:" + type + " is unsupported");
}
}
public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
this.minimumTimestamp.set(minimumTimestamp);
this.maximumTimestamp.set(maximumTimestamp);
public static TimeRangeTracker create(Type type, TimeRangeTracker trt) {
switch (type) {
case NON_SYNC:
return new NonSyncTimeRangeTracker(trt);
case SYNC:
return new SyncTimeRangeTracker(trt);
default:
throw new UnsupportedOperationException("The type:" + type + " is unsupported");
}
}
public static TimeRangeTracker create(Type type, long minimumTimestamp, long maximumTimestamp) {
switch (type) {
case NON_SYNC:
return new NonSyncTimeRangeTracker(minimumTimestamp, maximumTimestamp);
case SYNC:
return new SyncTimeRangeTracker(minimumTimestamp, maximumTimestamp);
default:
throw new UnsupportedOperationException("The type:" + type + " is unsupported");
}
}
protected abstract void setMax(long ts);
protected abstract void setMin(long ts);
protected abstract boolean compareAndSetMin(long expect, long update);
protected abstract boolean compareAndSetMax(long expect, long update);
/**
* Update the current TimestampRange to include the timestamp from <code>cell</code>.
* If the Key is of type DeleteColumn or DeleteFamily, it includes the
@ -91,12 +113,12 @@ public class TimeRangeTracker implements Writable {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
justification="Intentional")
void includeTimestamp(final long timestamp) {
long initialMinTimestamp = this.minimumTimestamp.get();
long initialMinTimestamp = getMin();
if (timestamp < initialMinTimestamp) {
long curMinTimestamp = initialMinTimestamp;
while (timestamp < curMinTimestamp) {
if (!this.minimumTimestamp.compareAndSet(curMinTimestamp, timestamp)) {
curMinTimestamp = this.minimumTimestamp.get();
if (!compareAndSetMin(curMinTimestamp, timestamp)) {
curMinTimestamp = getMin();
} else {
// successfully set minimumTimestamp, break.
break;
@ -120,12 +142,12 @@ public class TimeRangeTracker implements Writable {
}
}
long curMaxTimestamp = this.maximumTimestamp.get();
long curMaxTimestamp = getMax();
if (timestamp > curMaxTimestamp) {
while (timestamp > curMaxTimestamp) {
if (!this.maximumTimestamp.compareAndSet(curMaxTimestamp, timestamp)) {
curMaxTimestamp = this.maximumTimestamp.get();
if (!compareAndSetMax(curMaxTimestamp, timestamp)) {
curMaxTimestamp = getMax();
} else {
// successfully set maximumTimestamp, break
break;
@ -140,48 +162,43 @@ public class TimeRangeTracker implements Writable {
* @return True if there is overlap, false otherwise
*/
public boolean includesTimeRange(final TimeRange tr) {
return (this.minimumTimestamp.get() < tr.getMax() && this.maximumTimestamp.get() >= tr.getMin());
return (getMin() < tr.getMax() && getMax() >= tr.getMin());
}
/**
* @return the minimumTimestamp
*/
public long getMin() {
return minimumTimestamp.get();
}
public abstract long getMin();
/**
* @return the maximumTimestamp
*/
public long getMax() {
return maximumTimestamp.get();
}
public abstract long getMax();
public void write(final DataOutput out) throws IOException {
out.writeLong(minimumTimestamp.get());
out.writeLong(maximumTimestamp.get());
out.writeLong(getMin());
out.writeLong(getMax());
}
public void readFields(final DataInput in) throws IOException {
this.minimumTimestamp.set(in.readLong());
this.maximumTimestamp.set(in.readLong());
setMin(in.readLong());
setMax(in.readLong());
}
@Override
public String toString() {
return "[" + minimumTimestamp.get() + "," + maximumTimestamp.get() + "]";
return "[" + getMin() + "," + getMax() + "]";
}
/**
* @return An instance of TimeRangeTracker filled w/ the content of serialized
* TimeRangeTracker in <code>timeRangeTrackerBytes</code>.
* @return An instance of NonSyncTimeRangeTracker filled w/ the content of serialized
* NonSyncTimeRangeTracker in <code>timeRangeTrackerBytes</code>.
* @throws IOException
*/
public static TimeRangeTracker getTimeRangeTracker(final byte [] timeRangeTrackerBytes)
throws IOException {
if (timeRangeTrackerBytes == null) return null;
TimeRangeTracker trt = new TimeRangeTracker();
TimeRangeTracker trt = TimeRangeTracker.create(Type.NON_SYNC);
Writables.copyWritable(timeRangeTrackerBytes, trt);
return trt;
}
@ -211,4 +228,110 @@ public class TimeRangeTracker implements Writable {
}
return new TimeRange(min, max);
}
private static class NonSyncTimeRangeTracker extends TimeRangeTracker {
private long minimumTimestamp = INITIAL_MIN_TIMESTAMP;
private long maximumTimestamp = INITIAL_MAX_TIMESTAMP;
NonSyncTimeRangeTracker() {
}
NonSyncTimeRangeTracker(final TimeRangeTracker trt) {
this.minimumTimestamp = trt.getMin();
this.maximumTimestamp = trt.getMax();
}
NonSyncTimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
this.minimumTimestamp = minimumTimestamp;
this.maximumTimestamp = maximumTimestamp;
}
@Override
protected void setMax(long ts) {
maximumTimestamp = ts;
}
@Override
protected void setMin(long ts) {
minimumTimestamp = ts;
}
@Override
protected boolean compareAndSetMin(long expect, long update) {
if (minimumTimestamp != expect) {
return false;
}
minimumTimestamp = update;
return true;
}
@Override
protected boolean compareAndSetMax(long expect, long update) {
if (maximumTimestamp != expect) {
return false;
}
maximumTimestamp = update;
return true;
}
@Override
public long getMin() {
return minimumTimestamp;
}
@Override
public long getMax() {
return maximumTimestamp;
}
}
@VisibleForTesting
//In order to estimate the heap size, this inner class need to be accessible to TestHeapSize.
public static class SyncTimeRangeTracker extends TimeRangeTracker {
private final AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP);
private final AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP);
private SyncTimeRangeTracker() {
}
SyncTimeRangeTracker(final TimeRangeTracker trt) {
this.minimumTimestamp.set(trt.getMin());
this.maximumTimestamp.set(trt.getMax());
}
SyncTimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
this.minimumTimestamp.set(minimumTimestamp);
this.maximumTimestamp.set(maximumTimestamp);
}
@Override
protected void setMax(long ts) {
maximumTimestamp.set(ts);
}
@Override
protected void setMin(long ts) {
minimumTimestamp.set(ts);
}
@Override
protected boolean compareAndSetMin(long expect, long update) {
return minimumTimestamp.compareAndSet(expect, update);
}
@Override
protected boolean compareAndSetMax(long expect, long update) {
return maximumTimestamp.compareAndSet(expect, update);
}
@Override
public long getMin() {
return minimumTimestamp.get();
}
@Override
public long getMax() {
return maximumTimestamp.get();
}
}
}

View File

@ -19,22 +19,9 @@
package org.apache.hadoop.hbase.io;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
import org.apache.hadoop.hbase.regionserver.*;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
@ -53,8 +40,35 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
import org.apache.hadoop.hbase.regionserver.CSLMImmutableSegment;
import org.apache.hadoop.hbase.regionserver.CellArrayImmutableSegment;
import org.apache.hadoop.hbase.regionserver.CellArrayMap;
import org.apache.hadoop.hbase.regionserver.CellSet;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.CompactionPipeline;
import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.ImmutableSegment;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactor;
import org.apache.hadoop.hbase.regionserver.MutableSegment;
import org.apache.hadoop.hbase.regionserver.Segment;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Testing the sizing that HeapSize offers and compares to the size given by
@ -236,9 +250,9 @@ public class TestHeapSize {
}
// TimeRangeTracker
cl = TimeRangeTracker.class;
cl = SyncTimeRangeTracker.class;
expected = ClassSize.estimateBase(cl, false);
actual = ClassSize.TIMERANGE_TRACKER;
actual = ClassSize.SYNC_TIMERANGE_TRACKER;
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
@ -347,13 +361,13 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
assertEquals(expected, actual);
}
@ -364,7 +378,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
@ -372,7 +386,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
assertEquals(expected, actual);
}
@ -384,7 +398,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
@ -392,7 +406,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
assertEquals(expected, actual);
}
@ -403,7 +417,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
if (expected != actual) {
@ -412,7 +426,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
assertEquals(expected, actual);
@ -423,7 +437,7 @@ public class TestHeapSize {
expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(AtomicReference.class, false);
expected += ClassSize.estimateBase(CellSet.class, false);
expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
expected += ClassSize.estimateBase(SyncTimeRangeTracker.class, false);
expected += ClassSize.estimateBase(TimeRange.class, false);
expected += ClassSize.estimateBase(CellArrayMap.class, false);
if (expected != actual) {
@ -432,7 +446,7 @@ public class TestHeapSize {
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(AtomicReference.class, true);
ClassSize.estimateBase(CellSet.class, true);
ClassSize.estimateBase(TimeRangeTracker.class, true);
ClassSize.estimateBase(SyncTimeRangeTracker.class, true);
ClassSize.estimateBase(TimeRange.class, true);
ClassSize.estimateBase(CellArrayMap.class, true);
assertEquals(expected, actual);

View File

@ -49,7 +49,7 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy
for (int i = 0; i < sizes.length; i++) {
MockHStoreFile msf =
new MockHStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i);
msf.setTimeRangeTracker(new TimeRangeTracker(minTimestamps[i], maxTimestamps[i]));
msf.setTimeRangeTracker(TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, minTimestamps[i], maxTimestamps[i]));
ret.add(msf);
}
return ret;

View File

@ -168,7 +168,7 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
for (HStoreFile file : candidates) {
if (file instanceof MockHStoreFile) {
MockHStoreFile mockFile = (MockHStoreFile) file;
mockFile.setTimeRangeTracker(new TimeRangeTracker(-1, -1));
mockFile.setTimeRangeTracker(TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, -1, -1));
mockFile.setEntries(0);
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Writables;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
public class TestSimpleTimeRangeTracker {
protected TimeRangeTracker getTimeRangeTracker() {
return TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
}
protected TimeRangeTracker getTimeRangeTracker(long min, long max) {
return TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, min, max);
}
@Test
public void testExtreme() {
TimeRange tr = new TimeRange();
assertTrue(tr.includesTimeRange(new TimeRange()));
TimeRangeTracker trt = getTimeRangeTracker();
assertFalse(trt.includesTimeRange(new TimeRange()));
trt.includeTimestamp(1);
trt.includeTimestamp(10);
assertTrue(trt.includesTimeRange(new TimeRange()));
}
@Test
public void testTimeRangeInitialized() {
TimeRangeTracker src = getTimeRangeTracker();
TimeRange tr = new TimeRange(System.currentTimeMillis());
assertFalse(src.includesTimeRange(tr));
}
@Test
public void testTimeRangeTrackerNullIsSameAsTimeRangeNull() throws IOException {
TimeRangeTracker src = getTimeRangeTracker(1, 2);
byte [] bytes = Writables.getBytes(src);
TimeRange tgt = TimeRangeTracker.getTimeRange(bytes);
assertEquals(src.getMin(), tgt.getMin());
assertEquals(src.getMax(), tgt.getMax());
}
@Test
public void testSerialization() throws IOException {
TimeRangeTracker src = getTimeRangeTracker(1, 2);
TimeRangeTracker tgt = getTimeRangeTracker();
Writables.copyWritable(src, tgt);
assertEquals(src.getMin(), tgt.getMin());
assertEquals(src.getMax(), tgt.getMax());
}
@Test
public void testAlwaysDecrementingSetsMaximum() {
TimeRangeTracker trr = getTimeRangeTracker();
trr.includeTimestamp(3);
trr.includeTimestamp(2);
trr.includeTimestamp(1);
assertTrue(trr.getMin() != TimeRangeTracker.INITIAL_MIN_TIMESTAMP);
assertTrue(trr.getMax() != -1 /*The initial max value*/);
}
@Test
public void testSimpleInRange() {
TimeRangeTracker trr = getTimeRangeTracker();
trr.includeTimestamp(0);
trr.includeTimestamp(2);
assertTrue(trr.includesTimeRange(new TimeRange(1)));
}
@Test
public void testRangeConstruction() throws IOException {
TimeRange defaultRange = new TimeRange();
assertEquals(0L, defaultRange.getMin());
assertEquals(Long.MAX_VALUE, defaultRange.getMax());
assertTrue(defaultRange.isAllTime());
TimeRange oneArgRange = new TimeRange(0L);
assertEquals(0L, oneArgRange.getMin());
assertEquals(Long.MAX_VALUE, oneArgRange.getMax());
assertTrue(oneArgRange.isAllTime());
TimeRange oneArgRange2 = new TimeRange(1);
assertEquals(1, oneArgRange2.getMin());
assertEquals(Long.MAX_VALUE, oneArgRange2.getMax());
assertFalse(oneArgRange2.isAllTime());
TimeRange twoArgRange = new TimeRange(0L, Long.MAX_VALUE);
assertEquals(0L, twoArgRange.getMin());
assertEquals(Long.MAX_VALUE, twoArgRange.getMax());
assertTrue(twoArgRange.isAllTime());
TimeRange twoArgRange2 = new TimeRange(0L, Long.MAX_VALUE - 1);
assertEquals(0L, twoArgRange2.getMin());
assertEquals(Long.MAX_VALUE - 1, twoArgRange2.getMax());
assertFalse(twoArgRange2.isAllTime());
TimeRange twoArgRange3 = new TimeRange(1, Long.MAX_VALUE);
assertEquals(1, twoArgRange3.getMin());
assertEquals(Long.MAX_VALUE, twoArgRange3.getMax());
assertFalse(twoArgRange3.isAllTime());
}
}

View File

@ -17,76 +17,27 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
public class TestTimeRangeTracker {
public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker {
private static final int NUM_KEYS = 10000000;
private static final int NUM_OF_THREADS = 20;
@Test
public void testExtreme() {
TimeRange tr = new TimeRange();
assertTrue(tr.includesTimeRange(new TimeRange()));
TimeRangeTracker trt = new TimeRangeTracker();
assertFalse(trt.includesTimeRange(new TimeRange()));
trt.includeTimestamp(1);
trt.includeTimestamp(10);
assertTrue(trt.includesTimeRange(new TimeRange()));
protected TimeRangeTracker getTimeRangeTracker() {
return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
}
@Test
public void testTimeRangeInitialized() {
TimeRangeTracker src = new TimeRangeTracker();
TimeRange tr = new TimeRange(System.currentTimeMillis());
assertFalse(src.includesTimeRange(tr));
}
@Test
public void testTimeRangeTrackerNullIsSameAsTimeRangeNull() throws IOException {
TimeRangeTracker src = new TimeRangeTracker(1, 2);
byte [] bytes = Writables.getBytes(src);
TimeRange tgt = TimeRangeTracker.getTimeRange(bytes);
assertEquals(src.getMin(), tgt.getMin());
assertEquals(src.getMax(), tgt.getMax());
}
@Test
public void testSerialization() throws IOException {
TimeRangeTracker src = new TimeRangeTracker(1, 2);
TimeRangeTracker tgt = new TimeRangeTracker();
Writables.copyWritable(src, tgt);
assertEquals(src.getMin(), tgt.getMin());
assertEquals(src.getMax(), tgt.getMax());
}
@Test
public void testAlwaysDecrementingSetsMaximum() {
TimeRangeTracker trr = new TimeRangeTracker();
trr.includeTimestamp(3);
trr.includeTimestamp(2);
trr.includeTimestamp(1);
assertTrue(trr.getMin() != TimeRangeTracker.INITIAL_MIN_TIMESTAMP);
assertTrue(trr.getMax() != -1 /*The initial max value*/);
}
@Test
public void testSimpleInRange() {
TimeRangeTracker trr = new TimeRangeTracker();
trr.includeTimestamp(0);
trr.includeTimestamp(2);
assertTrue(trr.includesTimeRange(new TimeRange(1)));
protected TimeRangeTracker getTimeRangeTracker(long min, long max) {
return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max);
}
/**
@ -97,7 +48,7 @@ public class TestTimeRangeTracker {
*/
@Test
public void testArriveAtRightAnswer() throws InterruptedException {
final TimeRangeTracker trr = new TimeRangeTracker();
final TimeRangeTracker trr = getTimeRangeTracker();
final int threadCount = 10;
final int calls = 1000 * 1000;
Thread [] threads = new Thread[threadCount];
@ -126,41 +77,6 @@ public class TestTimeRangeTracker {
assertTrue(trr.getMin() == 0);
}
@Test
public void testRangeConstruction() throws IOException {
TimeRange defaultRange = new TimeRange();
assertEquals(0L, defaultRange.getMin());
assertEquals(Long.MAX_VALUE, defaultRange.getMax());
assertTrue(defaultRange.isAllTime());
TimeRange oneArgRange = new TimeRange(0L);
assertEquals(0L, oneArgRange.getMin());
assertEquals(Long.MAX_VALUE, oneArgRange.getMax());
assertTrue(oneArgRange.isAllTime());
TimeRange oneArgRange2 = new TimeRange(1);
assertEquals(1, oneArgRange2.getMin());
assertEquals(Long.MAX_VALUE, oneArgRange2.getMax());
assertFalse(oneArgRange2.isAllTime());
TimeRange twoArgRange = new TimeRange(0L, Long.MAX_VALUE);
assertEquals(0L, twoArgRange.getMin());
assertEquals(Long.MAX_VALUE, twoArgRange.getMax());
assertTrue(twoArgRange.isAllTime());
TimeRange twoArgRange2 = new TimeRange(0L, Long.MAX_VALUE - 1);
assertEquals(0L, twoArgRange2.getMin());
assertEquals(Long.MAX_VALUE - 1, twoArgRange2.getMax());
assertFalse(twoArgRange2.isAllTime());
TimeRange twoArgRange3 = new TimeRange(1, Long.MAX_VALUE);
assertEquals(1, twoArgRange3.getMin());
assertEquals(Long.MAX_VALUE, twoArgRange3.getMax());
assertFalse(twoArgRange3.isAllTime());
}
final static int NUM_OF_THREADS = 20;
class RandomTestData {
private long[] keys = new long[NUM_KEYS];
private long min = Long.MAX_VALUE;
@ -226,7 +142,7 @@ public class TestTimeRangeTracker {
}
}
TimeRangeTracker trt = new TimeRangeTracker();
TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
Thread[] t = new Thread[NUM_OF_THREADS];
for (int i = 0; i < NUM_OF_THREADS; i++) {
@ -245,32 +161,4 @@ public class TestTimeRangeTracker {
assertTrue(min == trt.getMin());
assertTrue(max == trt.getMax());
}
/**
* Bit of code to test concurrent access on this class.
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
final TimeRangeTracker trr = new TimeRangeTracker();
final int threadCount = 5;
final int calls = 1024 * 1024 * 128;
Thread [] threads = new Thread[threadCount];
for (int i = 0; i < threads.length; i++) {
Thread t = new Thread("" + i) {
@Override
public void run() {
for (int i = 0; i < calls; i++) trr.includeTimestamp(i);
}
};
t.start();
threads[i] = t;
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
System.out.println(trr.getMin() + " " + trr.getMax() + " " +
(System.currentTimeMillis() - start));
}
}