HBASE-12148 Remove TimeRangeTracker as point of contention when many threads writing a Store (Huaxiang Sun)
This commit is contained in:
parent
1576269123
commit
dd1ae37148
|
@ -313,8 +313,7 @@ public class ClassSize {
|
|||
|
||||
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
|
||||
|
||||
TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
|
||||
CELL_SET = align(OBJECT + REFERENCE);
|
||||
|
||||
STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
|
@ -28,9 +29,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Stores minimum and maximum timestamp values.
|
||||
* Stores minimum and maximum timestamp values, it is [minimumTimestamp, maximumTimestamp] in
|
||||
* interval notation.
|
||||
* Use this class at write-time ONLY. Too much synchronization to use at read time
|
||||
* (TODO: there are two scenarios writing, once when lots of concurrency as part of memstore
|
||||
* updates but then later we can make one as part of a compaction when there is only one thread
|
||||
|
@ -46,8 +47,9 @@ import org.apache.hadoop.io.Writable;
|
|||
public class TimeRangeTracker implements Writable {
|
||||
static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE;
|
||||
static final long INITIAL_MAX_TIMESTAMP = -1L;
|
||||
long minimumTimestamp = INITIAL_MIN_TIMESTAMP;
|
||||
long maximumTimestamp = INITIAL_MAX_TIMESTAMP;
|
||||
|
||||
AtomicLong minimumTimestamp = new AtomicLong(INITIAL_MIN_TIMESTAMP);
|
||||
AtomicLong maximumTimestamp = new AtomicLong(INITIAL_MAX_TIMESTAMP);
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
|
@ -60,26 +62,13 @@ public class TimeRangeTracker implements Writable {
|
|||
* @param trt source TimeRangeTracker
|
||||
*/
|
||||
public TimeRangeTracker(final TimeRangeTracker trt) {
|
||||
set(trt.getMin(), trt.getMax());
|
||||
minimumTimestamp.set(trt.getMin());
|
||||
maximumTimestamp.set(trt.getMax());
|
||||
}
|
||||
|
||||
public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
|
||||
set(minimumTimestamp, maximumTimestamp);
|
||||
}
|
||||
|
||||
private void set(final long min, final long max) {
|
||||
this.minimumTimestamp = min;
|
||||
this.maximumTimestamp = max;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param l
|
||||
* @return True if we initialized values
|
||||
*/
|
||||
private boolean init(final long l) {
|
||||
if (this.minimumTimestamp != INITIAL_MIN_TIMESTAMP) return false;
|
||||
set(l, l);
|
||||
return true;
|
||||
this.minimumTimestamp.set(minimumTimestamp);
|
||||
this.maximumTimestamp.set(maximumTimestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -102,23 +91,44 @@ public class TimeRangeTracker implements Writable {
|
|||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||
justification="Intentional")
|
||||
void includeTimestamp(final long timestamp) {
|
||||
// Do test outside of synchronization block. Synchronization in here can be problematic
|
||||
// when many threads writing one Store -- they can all pile up trying to add in here.
|
||||
// Happens when doing big write upload where we are hammering on one region.
|
||||
if (timestamp < this.minimumTimestamp) {
|
||||
synchronized (this) {
|
||||
if (!init(timestamp)) {
|
||||
if (timestamp < this.minimumTimestamp) {
|
||||
this.minimumTimestamp = timestamp;
|
||||
long initialMinTimestamp = this.minimumTimestamp.get();
|
||||
if (timestamp < initialMinTimestamp) {
|
||||
long curMinTimestamp = initialMinTimestamp;
|
||||
while (timestamp < curMinTimestamp) {
|
||||
if (!this.minimumTimestamp.compareAndSet(curMinTimestamp, timestamp)) {
|
||||
curMinTimestamp = this.minimumTimestamp.get();
|
||||
} else {
|
||||
// successfully set minimumTimestamp, break.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// When it reaches here, there are two possibilities:
|
||||
// 1). timestamp >= curMinTimestamp, someone already sets the minimumTimestamp. In this case,
|
||||
// it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP to see
|
||||
// if it needs to update minimumTimestamp. Someone may already set both
|
||||
// minimumTimestamp/minimumTimestamp to the same value(curMinTimestamp),
|
||||
// need to check if maximumTimestamp needs to be updated.
|
||||
// 2). timestamp < curMinTimestamp, it sets the minimumTimestamp successfully.
|
||||
// In this case,it still needs to check if initialMinTimestamp == INITIAL_MIN_TIMESTAMP
|
||||
// to see if it needs to set maximumTimestamp.
|
||||
if (initialMinTimestamp != INITIAL_MIN_TIMESTAMP) {
|
||||
// Someone already sets minimumTimestamp and timestamp is less than minimumTimestamp.
|
||||
// In this case, no need to set maximumTimestamp as it will be set to at least
|
||||
// initialMinTimestamp.
|
||||
return;
|
||||
}
|
||||
} else if (timestamp > this.maximumTimestamp) {
|
||||
synchronized (this) {
|
||||
if (!init(timestamp)) {
|
||||
if (this.maximumTimestamp < timestamp) {
|
||||
this.maximumTimestamp = timestamp;
|
||||
}
|
||||
|
||||
long curMaxTimestamp = this.maximumTimestamp.get();
|
||||
|
||||
if (timestamp > curMaxTimestamp) {
|
||||
while (timestamp > curMaxTimestamp) {
|
||||
if (!this.maximumTimestamp.compareAndSet(curMaxTimestamp, timestamp)) {
|
||||
curMaxTimestamp = this.maximumTimestamp.get();
|
||||
} else {
|
||||
// successfully set maximumTimestamp, break
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -126,40 +136,41 @@ public class TimeRangeTracker implements Writable {
|
|||
|
||||
/**
|
||||
* Check if the range has ANY overlap with TimeRange
|
||||
* @param tr TimeRange
|
||||
* @param tr TimeRange, it expects [minStamp, maxStamp)
|
||||
* @return True if there is overlap, false otherwise
|
||||
*/
|
||||
public synchronized boolean includesTimeRange(final TimeRange tr) {
|
||||
return (this.minimumTimestamp < tr.getMax() && this.maximumTimestamp >= tr.getMin());
|
||||
public boolean includesTimeRange(final TimeRange tr) {
|
||||
return (this.minimumTimestamp.get() < tr.getMax() && this.maximumTimestamp.get() >= tr.getMin());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the minimumTimestamp
|
||||
*/
|
||||
public synchronized long getMin() {
|
||||
return minimumTimestamp;
|
||||
public long getMin() {
|
||||
return minimumTimestamp.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximumTimestamp
|
||||
*/
|
||||
public synchronized long getMax() {
|
||||
return maximumTimestamp;
|
||||
public long getMax() {
|
||||
return maximumTimestamp.get();
|
||||
}
|
||||
|
||||
public synchronized void write(final DataOutput out) throws IOException {
|
||||
out.writeLong(minimumTimestamp);
|
||||
out.writeLong(maximumTimestamp);
|
||||
public void write(final DataOutput out) throws IOException {
|
||||
out.writeLong(minimumTimestamp.get());
|
||||
out.writeLong(maximumTimestamp.get());
|
||||
}
|
||||
|
||||
public synchronized void readFields(final DataInput in) throws IOException {
|
||||
this.minimumTimestamp = in.readLong();
|
||||
this.maximumTimestamp = in.readLong();
|
||||
public void readFields(final DataInput in) throws IOException {
|
||||
|
||||
this.minimumTimestamp.set(in.readLong());
|
||||
this.maximumTimestamp.set(in.readLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
|
||||
public String toString() {
|
||||
return "[" + minimumTimestamp.get() + "," + maximumTimestamp.get() + "]";
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,10 +29,12 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@Category({RegionServerTests.class, SmallTests.class})
|
||||
public class TestTimeRangeTracker {
|
||||
private static final int NUM_KEYS = 10000000;
|
||||
|
||||
@Test
|
||||
public void testExtreme() {
|
||||
TimeRange tr = new TimeRange();
|
||||
|
@ -119,6 +121,7 @@ public class TestTimeRangeTracker {
|
|||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
assertTrue(trr.getMax() == calls * threadCount);
|
||||
assertTrue(trr.getMin() == 0);
|
||||
}
|
||||
|
@ -156,6 +159,93 @@ public class TestTimeRangeTracker {
|
|||
assertFalse(twoArgRange3.isAllTime());
|
||||
}
|
||||
|
||||
final static int NUM_OF_THREADS = 20;
|
||||
|
||||
class RandomTestData {
|
||||
private long[] keys = new long[NUM_KEYS];
|
||||
private long min = Long.MAX_VALUE;
|
||||
private long max = 0;
|
||||
|
||||
public RandomTestData() {
|
||||
if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
|
||||
for (int i = 0; i < NUM_KEYS; i++) {
|
||||
keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
|
||||
if (keys[i] < min) min = keys[i];
|
||||
if (keys[i] > max) max = keys[i];
|
||||
}
|
||||
} else {
|
||||
for (int i = NUM_KEYS - 1; i >= 0; i--) {
|
||||
keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
|
||||
if (keys[i] < min) min = keys[i];
|
||||
if (keys[i] > max) max = keys[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long getMax() {
|
||||
return this.max;
|
||||
}
|
||||
|
||||
public long getMin() {
|
||||
return this.min;
|
||||
}
|
||||
}
|
||||
|
||||
class TrtUpdateRunnable implements Runnable {
|
||||
|
||||
private TimeRangeTracker trt;
|
||||
private RandomTestData data;
|
||||
public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
|
||||
this.trt = trt;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
for (long key : data.keys) {
|
||||
trt.includeTimestamp(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
|
||||
* at right range. The data chosen is going to ensure that there are lots collisions, i.e,
|
||||
* some other threads may already update the value while one tries to update min/max value.
|
||||
*/
|
||||
@Test
|
||||
public void testConcurrentIncludeTimestampCorrectness() {
|
||||
RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
|
||||
long min = Long.MAX_VALUE, max = 0;
|
||||
for (int i = 0; i < NUM_OF_THREADS; i ++) {
|
||||
testData[i] = new RandomTestData();
|
||||
if (testData[i].getMin() < min) {
|
||||
min = testData[i].getMin();
|
||||
}
|
||||
if (testData[i].getMax() > max) {
|
||||
max = testData[i].getMax();
|
||||
}
|
||||
}
|
||||
|
||||
TimeRangeTracker trt = new TimeRangeTracker();
|
||||
|
||||
Thread[] t = new Thread[NUM_OF_THREADS];
|
||||
for (int i = 0; i < NUM_OF_THREADS; i++) {
|
||||
t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
|
||||
t[i].start();
|
||||
}
|
||||
|
||||
for (Thread thread : t) {
|
||||
try {
|
||||
thread.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(min == trt.getMin());
|
||||
assertTrue(max == trt.getMax());
|
||||
}
|
||||
|
||||
/**
|
||||
* Bit of code to test concurrent access on this class.
|
||||
* @param args
|
||||
|
|
Loading…
Reference in New Issue