HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store (Huaxiang Sun)
This commit is contained in:
parent
9b26c9ff37
commit
6130ea4d54
|
@ -278,7 +278,7 @@ public class ClassSize {
|
||||||
|
|
||||||
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
|
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
|
||||||
|
|
||||||
TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
|
TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
|
||||||
|
|
||||||
CELL_SKIPLIST_SET = align(OBJECT + REFERENCE);
|
CELL_SKIPLIST_SET = align(OBJECT + REFERENCE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
@ -28,9 +29,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.io.TimeRange;
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores minimum and maximum timestamp values.
|
* Stores minimum and maximum timestamp values, it is [minimumTimestamp, maximumTimestamp] in
|
||||||
|
* interval notation.
|
||||||
* Use this class at write-time ONLY. Too much synchronization to use at read time
|
* Use this class at write-time ONLY. Too much synchronization to use at read time
|
||||||
* (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore
|
* (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore
|
||||||
* updates but then later we can make one as part of a compaction when there is only one thread
|
* updates but then later we can make one as part of a compaction when there is only one thread
|
||||||
|
@ -46,8 +47,9 @@ import org.apache.hadoop.io.Writable;
|
||||||
public class TimeRangeTracker implements Writable {
|
public class TimeRangeTracker implements Writable {
|
||||||
static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE;
|
static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE;
|
||||||
static final long INITIAL_MAX_TIMESTAMP = -1L;
|
static final long INITIAL_MAX_TIMESTAMP = -1L;
|
||||||
long minimumTimestamp = INITIAL_MIN_TIMESTAMP;
|
|
||||||
long maximumTimestamp = INITIAL_MAX_TIMESTAMP;
|
AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP);
|
||||||
|
AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor.
|
* Default constructor.
|
||||||
|
@ -60,26 +62,13 @@ public class TimeRangeTracker implements Writable {
|
||||||
* @param trt source TimeRangeTracker
|
* @param trt source TimeRangeTracker
|
||||||
*/
|
*/
|
||||||
public TimeRangeTracker(final TimeRangeTracker trt) {
|
public TimeRangeTracker(final TimeRangeTracker trt) {
|
||||||
set(trt.getMin(), trt.getMax());
|
minimumTimestamp.set(trt.getMin());
|
||||||
|
maximumTimestamp.set(trt.getMax());
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
|
public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
|
||||||
set(minimumTimestamp, maximumTimestamp);
|
this.minimumTimestamp.set(minimumTimestamp);
|
||||||
}
|
this.maximumTimestamp.set(maximumTimestamp);
|
||||||
|
|
||||||
private void set(final long min, final long max) {
|
|
||||||
this.minimumTimestamp = min;
|
|
||||||
this.maximumTimestamp = max;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param l
|
|
||||||
* @return True if we initialized values
|
|
||||||
*/
|
|
||||||
private boolean init(final long l) {
|
|
||||||
if (this.minimumTimestamp != INITIAL_MIN_TIMESTAMP) return false;
|
|
||||||
set(l, l);
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,23 +91,44 @@ public class TimeRangeTracker implements Writable {
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||||
justification="Intentional")
|
justification="Intentional")
|
||||||
void includeTimestamp(final long timestamp) {
|
void includeTimestamp(final long timestamp) {
|
||||||
// Do test outside of synchronization block. Synchronization in here can be problematic
|
long initialMinTimestamp = this.minimumTimestamp.get();
|
||||||
// when many threads writing one Store -- they can all pile up trying to add in here.
|
if (timestamp < initialMinTimestamp) {
|
||||||
// Happens when doing big write upload where we are hammering on one region.
|
long curMinTimestamp = initialMinTimestamp;
|
||||||
if (timestamp < this.minimumTimestamp) {
|
while (timestamp < curMinTimestamp) {
|
||||||
synchronized (this) {
|
if (!this.minimumTimestamp.compareAndSet(curMinTimestamp, timestamp)) {
|
||||||
if (!init(timestamp)) {
|
curMinTimestamp = this.minimumTimestamp.get();
|
||||||
if (timestamp < this.minimumTimestamp) {
|
} else {
|
||||||
this.minimumTimestamp = timestamp;
|
// successfully set minimumTimestamp, break.
|
||||||
}
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (timestamp > this.maximumTimestamp) {
|
|
||||||
synchronized (this) {
|
// When it reaches here, there are two possibilities:
|
||||||
if (!init(timestamp)) {
|
// 1). timestamp >= curMinTimestamp, someone already sets the minimumTimestamp. In this case,
|
||||||
if (this.maximumTimestamp < timestamp) {
|
// it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP to see
|
||||||
this.maximumTimestamp = timestamp;
|
// if it needs to update minimumTimestamp. Someone may already set both
|
||||||
}
|
// minimumTimestamp/minimumTimestamp to the same value(curMinTimestamp),
|
||||||
|
// need to check if maximumTimestamp needs to be updated.
|
||||||
|
// 2). timestamp < curMinTimestamp, it sets the minimumTimestamp successfully.
|
||||||
|
// In this case,it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP
|
||||||
|
// to see if it needs to set maximumTimestamp.
|
||||||
|
if (initialMinTimestamp != INITIAL_MIN_TIMESTAMP) {
|
||||||
|
// Someone already sets minimumTimestamp and timestamp is less than minimumTimestamp.
|
||||||
|
// In this case, no need to set maximumTimestamp as it will be set to at least
|
||||||
|
// initialMinTimestamp.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long curMaxTimestamp = this.maximumTimestamp.get();
|
||||||
|
|
||||||
|
if (timestamp > curMaxTimestamp) {
|
||||||
|
while (timestamp > curMaxTimestamp) {
|
||||||
|
if (!this.maximumTimestamp.compareAndSet(curMaxTimestamp, timestamp)) {
|
||||||
|
curMaxTimestamp = this.maximumTimestamp.get();
|
||||||
|
} else {
|
||||||
|
// successfully set maximumTimestamp, break
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,40 +136,41 @@ public class TimeRangeTracker implements Writable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the range has ANY overlap with TimeRange
|
* Check if the range has ANY overlap with TimeRange
|
||||||
* @param tr TimeRange
|
* @param tr TimeRange, it expects [minStamp, maxStamp)
|
||||||
* @return True if there is overlap, false otherwise
|
* @return True if there is overlap, false otherwise
|
||||||
*/
|
*/
|
||||||
public synchronized boolean includesTimeRange(final TimeRange tr) {
|
public boolean includesTimeRange(final TimeRange tr) {
|
||||||
return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin());
|
return (this.minimumTimestamp.get() < tr.getMax() && this.maximumTimestamp.get() >= tr.getMin());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the minimumTimestamp
|
* @return the minimumTimestamp
|
||||||
*/
|
*/
|
||||||
public synchronized long getMin() {
|
public long getMin() {
|
||||||
return minimumTimestamp;
|
return minimumTimestamp.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the maximumTimestamp
|
* @return the maximumTimestamp
|
||||||
*/
|
*/
|
||||||
public synchronized long getMax() {
|
public long getMax() {
|
||||||
return maximumTimestamp;
|
return maximumTimestamp.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void write(final DataOutput out) throws IOException {
|
public void write(final DataOutput out) throws IOException {
|
||||||
out.writeLong(minimumTimestamp);
|
out.writeLong(minimumTimestamp.get());
|
||||||
out.writeLong(maximumTimestamp);
|
out.writeLong(maximumTimestamp.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void readFields(final DataInput in) throws IOException {
|
public void readFields(final DataInput in) throws IOException {
|
||||||
this.minimumTimestamp = in.readLong();
|
|
||||||
this.maximumTimestamp = in.readLong();
|
this.minimumTimestamp.set(in.readLong());
|
||||||
|
this.maximumTimestamp.set(in.readLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized String toString() {
|
public String toString() {
|
||||||
return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
|
return "[" + minimumTimestamp.get() + "," + maximumTimestamp.get() + "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -28,9 +28,12 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
@Category({SmallTests.class})
|
@Category({SmallTests.class})
|
||||||
public class TestTimeRangeTracker {
|
public class TestTimeRangeTracker {
|
||||||
|
private static final int NUM_KEYS = 10000000;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExtreme() {
|
public void testExtreme() {
|
||||||
TimeRange tr = new TimeRange();
|
TimeRange tr = new TimeRange();
|
||||||
|
@ -117,6 +120,7 @@ public class TestTimeRangeTracker {
|
||||||
for (int i = 0; i < threads.length; i++) {
|
for (int i = 0; i < threads.length; i++) {
|
||||||
threads[i].join();
|
threads[i].join();
|
||||||
}
|
}
|
||||||
|
|
||||||
assertTrue(trr.getMax() == calls * threadCount);
|
assertTrue(trr.getMax() == calls * threadCount);
|
||||||
assertTrue(trr.getMin() == 0);
|
assertTrue(trr.getMin() == 0);
|
||||||
}
|
}
|
||||||
|
@ -154,6 +158,93 @@ public class TestTimeRangeTracker {
|
||||||
assertFalse(twoArgRange3.isAllTime());
|
assertFalse(twoArgRange3.isAllTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final static int NUM_OF_THREADS = 20;
|
||||||
|
|
||||||
|
class RandomTestData {
|
||||||
|
private long[] keys = new long[NUM_KEYS];
|
||||||
|
private long min = Long.MAX_VALUE;
|
||||||
|
private long max = 0;
|
||||||
|
|
||||||
|
public RandomTestData() {
|
||||||
|
if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
|
||||||
|
for (int i = 0; i < NUM_KEYS; i++) {
|
||||||
|
keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
|
||||||
|
if (keys[i] < min) min = keys[i];
|
||||||
|
if (keys[i] > max) max = keys[i];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int i = NUM_KEYS - 1; i >= 0; i--) {
|
||||||
|
keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
|
||||||
|
if (keys[i] < min) min = keys[i];
|
||||||
|
if (keys[i] > max) max = keys[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMax() {
|
||||||
|
return this.max;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMin() {
|
||||||
|
return this.min;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class TrtUpdateRunnable implements Runnable {
|
||||||
|
|
||||||
|
private TimeRangeTracker trt;
|
||||||
|
private RandomTestData data;
|
||||||
|
public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
|
||||||
|
this.trt = trt;
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
for (long key : data.keys) {
|
||||||
|
trt.includeTimestamp(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
|
||||||
|
* at right range. The data chosen is going to ensure that there are lots collisions, i.e,
|
||||||
|
* some other threads may already update the value while one tries to update min/max value.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testConcurrentIncludeTimestampCorrectness() {
|
||||||
|
RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
|
||||||
|
long min = Long.MAX_VALUE, max = 0;
|
||||||
|
for (int i = 0; i < NUM_OF_THREADS; i ++) {
|
||||||
|
testData[i] = new RandomTestData();
|
||||||
|
if (testData[i].getMin() < min) {
|
||||||
|
min = testData[i].getMin();
|
||||||
|
}
|
||||||
|
if (testData[i].getMax() > max) {
|
||||||
|
max = testData[i].getMax();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TimeRangeTracker trt = new TimeRangeTracker();
|
||||||
|
|
||||||
|
Thread[] t = new Thread[NUM_OF_THREADS];
|
||||||
|
for (int i = 0; i < NUM_OF_THREADS; i++) {
|
||||||
|
t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
|
||||||
|
t[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Thread thread : t) {
|
||||||
|
try {
|
||||||
|
thread.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(min == trt.getMin());
|
||||||
|
assertTrue(max == trt.getMax());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bit of code to test concurrent access on this class.
|
* Bit of code to test concurrent access on this class.
|
||||||
* @param args
|
* @param args
|
||||||
|
|
Loading…
Reference in New Issue