diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index c0b3128eb0e..92fde2cc85b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -278,7 +278,7 @@ public class ClassSize { TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN); - TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); + TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE); CELL_SKIPLIST_SET = align(OBJECT + REFERENCE); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java index 78950f841af..9b2a56aea71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -28,9 +29,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Writable; - /** - * Stores minimum and maximum timestamp values. + * Stores minimum and maximum timestamp values, it is [minimumTimestamp, maximumTimestamp] in + * interval notation. * Use this class at write-time ONLY. Too much synchronization to use at read time * (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore * updates but then later we can make one as part of a compaction when there is only one thread @@ -46,8 +47,9 @@ import org.apache.hadoop.io.Writable; public class TimeRangeTracker implements Writable { static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE; static final long INITIAL_MAX_TIMESTAMP = -1L; - long minimumTimestamp = INITIAL_MIN_TIMESTAMP; - long maximumTimestamp = INITIAL_MAX_TIMESTAMP; + + AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP); + AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP); /** * Default constructor. @@ -60,26 +62,13 @@ public class TimeRangeTracker implements Writable { * @param trt source TimeRangeTracker */ public TimeRangeTracker(final TimeRangeTracker trt) { - set(trt.getMin(), trt.getMax()); + minimumTimestamp.set(trt.getMin()); + maximumTimestamp.set(trt.getMax()); } public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) { - set(minimumTimestamp, maximumTimestamp); - } - - private void set(final long min, final long max) { - this.minimumTimestamp = min; - this.maximumTimestamp = max; - } - - /** - * @param l - * @return True if we initialized values - */ - private boolean init(final long l) { - if (this.minimumTimestamp != INITIAL_MIN_TIMESTAMP) return false; - set(l, l); - return true; + this.minimumTimestamp.set(minimumTimestamp); + this.maximumTimestamp.set(maximumTimestamp); } /** @@ -102,23 +91,44 @@ public class TimeRangeTracker implements Writable { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS", justification="Intentional") void includeTimestamp(final long timestamp) { - // Do test outside of synchronization block. Synchronization in here can be problematic - // when many threads writing one Store -- they can all pile up trying to add in here. - // Happens when doing big write upload where we are hammering on one region. - if (timestamp < this.minimumTimestamp) { - synchronized (this) { - if (!init(timestamp)) { - if (timestamp < this.minimumTimestamp) { - this.minimumTimestamp = timestamp; - } + long initialMinTimestamp = this.minimumTimestamp.get(); + if (timestamp < initialMinTimestamp) { + long curMinTimestamp = initialMinTimestamp; + while (timestamp < curMinTimestamp) { + if (!this.minimumTimestamp.compareAndSet(curMinTimestamp, timestamp)) { + curMinTimestamp = this.minimumTimestamp.get(); + } else { + // successfully set minimumTimestamp, break. + break; } } - } else if (timestamp > this.maximumTimestamp) { - synchronized (this) { - if (!init(timestamp)) { - if (this.maximumTimestamp < timestamp) { - this.maximumTimestamp = timestamp; - } + + // When it reaches here, there are two possibilities: + // 1). timestamp >= curMinTimestamp, someone already sets the minimumTimestamp. In this case, + // it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP to see + // if it needs to update minimumTimestamp. Someone may already set both + // minimumTimestamp/minimumTimestamp to the same value(curMinTimestamp), + // need to check if maximumTimestamp needs to be updated. + // 2). timestamp < curMinTimestamp, it sets the minimumTimestamp successfully. + // In this case,it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP + // to see if it needs to set maximumTimestamp. + if (initialMinTimestamp != INITIAL_MIN_TIMESTAMP) { + // Someone already sets minimumTimestamp and timestamp is less than minimumTimestamp. + // In this case, no need to set maximumTimestamp as it will be set to at least + // initialMinTimestamp. + return; + } + } + + long curMaxTimestamp = this.maximumTimestamp.get(); + + if (timestamp > curMaxTimestamp) { + while (timestamp > curMaxTimestamp) { + if (!this.maximumTimestamp.compareAndSet(curMaxTimestamp, timestamp)) { + curMaxTimestamp = this.maximumTimestamp.get(); + } else { + // successfully set maximumTimestamp, break + break; } } } @@ -126,40 +136,41 @@ public class TimeRangeTracker implements Writable { /** * Check if the range has ANY overlap with TimeRange - * @param tr TimeRange + * @param tr TimeRange, it expects [minStamp, maxStamp) * @return True if there is overlap, false otherwise */ - public synchronized boolean includesTimeRange(final TimeRange tr) { - return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin()); + public boolean includesTimeRange(final TimeRange tr) { + return (this.minimumTimestamp.get() < tr.getMax() && this.maximumTimestamp.get() >= tr.getMin()); } /** * @return the minimumTimestamp */ - public synchronized long getMin() { - return minimumTimestamp; + public long getMin() { + return minimumTimestamp.get(); } /** * @return the maximumTimestamp */ - public synchronized long getMax() { - return maximumTimestamp; + public long getMax() { + return maximumTimestamp.get(); } - public synchronized void write(final DataOutput out) throws IOException { - out.writeLong(minimumTimestamp); - out.writeLong(maximumTimestamp); + public void write(final DataOutput out) throws IOException { + out.writeLong(minimumTimestamp.get()); + out.writeLong(maximumTimestamp.get()); } - public synchronized void readFields(final DataInput in) throws IOException { - this.minimumTimestamp = in.readLong(); - this.maximumTimestamp = in.readLong(); + public void readFields(final DataInput in) throws IOException { + + this.minimumTimestamp.set(in.readLong()); + this.maximumTimestamp.set(in.readLong()); } @Override - public synchronized String toString() { - return "[" + minimumTimestamp + "," + maximumTimestamp + "]"; + public String toString() { + return "[" + minimumTimestamp.get() + "," + maximumTimestamp.get() + "]"; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java index 377801ccf71..4e610676c0d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTimeRangeTracker.java @@ -28,9 +28,12 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Writables; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.concurrent.ThreadLocalRandom; @Category({SmallTests.class}) public class TestTimeRangeTracker { + private static final int NUM_KEYS = 10000000; + @Test public void testExtreme() { TimeRange tr = new TimeRange(); @@ -117,6 +120,7 @@ public class TestTimeRangeTracker { for (int i = 0; i < threads.length; i++) { threads[i].join(); } + assertTrue(trr.getMax() == calls * threadCount); assertTrue(trr.getMin() == 0); } @@ -154,6 +158,93 @@ public class TestTimeRangeTracker { assertFalse(twoArgRange3.isAllTime()); } + final static int NUM_OF_THREADS = 20; + + class RandomTestData { + private long[] keys = new long[NUM_KEYS]; + private long min = Long.MAX_VALUE; + private long max = 0; + + public RandomTestData() { + if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) { + for (int i = 0; i < NUM_KEYS; i++) { + keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); + if (keys[i] < min) min = keys[i]; + if (keys[i] > max) max = keys[i]; + } + } else { + for (int i = NUM_KEYS - 1; i >= 0; i--) { + keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); + if (keys[i] < min) min = keys[i]; + if (keys[i] > max) max = keys[i]; + } + } + } + + public long getMax() { + return this.max; + } + + public long getMin() { + return this.min; + } + } + + class TrtUpdateRunnable implements Runnable { + + private TimeRangeTracker trt; + private RandomTestData data; + public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) { + this.trt = trt; + this.data = data; + } + + public void run() { + for (long key : data.keys) { + trt.includeTimestamp(key); + } + } + } + + /** + * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive + * at right range. The data chosen is going to ensure that there are lots collisions, i.e, + * some other threads may already update the value while one tries to update min/max value. + */ + @Test + public void testConcurrentIncludeTimestampCorrectness() { + RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS]; + long min = Long.MAX_VALUE, max = 0; + for (int i = 0; i < NUM_OF_THREADS; i ++) { + testData[i] = new RandomTestData(); + if (testData[i].getMin() < min) { + min = testData[i].getMin(); + } + if (testData[i].getMax() > max) { + max = testData[i].getMax(); + } + } + + TimeRangeTracker trt = new TimeRangeTracker(); + + Thread[] t = new Thread[NUM_OF_THREADS]; + for (int i = 0; i < NUM_OF_THREADS; i++) { + t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i])); + t[i].start(); + } + + for (Thread thread : t) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + assertTrue(min == trt.getMin()); + assertTrue(max == trt.getMax()); + } + /** * Bit of code to test concurrent access on this class. * @param args