mirror of https://github.com/apache/lucene.git
LUCENE-4116: fix concurrency test for DWPTStallControl
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1348621 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cbbf7bfc6a
commit
76c15cab19
|
@ -39,7 +39,6 @@ import org.apache.lucene.util.ThreadInterruptedException;
|
|||
final class DocumentsWriterStallControl {
|
||||
@SuppressWarnings("serial")
|
||||
private static final class Sync extends AbstractQueuedSynchronizer {
|
||||
volatile boolean hasBlockedThreads = false; // only with assert
|
||||
|
||||
Sync() {
|
||||
setState(0);
|
||||
|
@ -67,15 +66,10 @@ final class DocumentsWriterStallControl {
|
|||
|
||||
@Override
|
||||
public int tryAcquireShared(int acquires) {
|
||||
assert maybeSetHasBlocked(getState());
|
||||
return getState() == 0 ? 1 : -1;
|
||||
}
|
||||
|
||||
// only used for testing
|
||||
private boolean maybeSetHasBlocked(int state) {
|
||||
hasBlockedThreads |= getState() != 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean tryReleaseShared(int newState) {
|
||||
|
@ -130,7 +124,7 @@ final class DocumentsWriterStallControl {
|
|||
}
|
||||
|
||||
boolean hasBlocked() { // for tests
|
||||
return sync.hasBlockedThreads;
|
||||
return sync.hasQueuedThreads();
|
||||
}
|
||||
|
||||
static interface MemoryController {
|
||||
|
@ -138,4 +132,12 @@ final class DocumentsWriterStallControl {
|
|||
long flushBytes();
|
||||
long stallLimitBytes();
|
||||
}
|
||||
|
||||
public boolean isHealthy() {
|
||||
return sync.isHealthy();
|
||||
}
|
||||
|
||||
public boolean isThreadQueued(Thread t) {
|
||||
return sync.isQueued(t);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,22 +127,19 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
int numStallers = atLeast(1);
|
||||
int numReleasers = atLeast(1);
|
||||
int numWaiters = atLeast(1);
|
||||
|
||||
final CountDownLatch[] latches = new CountDownLatch[] {
|
||||
new CountDownLatch(numStallers + numReleasers), new CountDownLatch(1),
|
||||
new CountDownLatch(numWaiters)};
|
||||
final Synchonizer sync = new Synchonizer(numStallers + numReleasers, numStallers + numReleasers+numWaiters);
|
||||
Thread[] threads = new Thread[numReleasers + numStallers + numWaiters];
|
||||
List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
|
||||
for (int i = 0; i < numReleasers; i++) {
|
||||
threads[i] = new Updater(stop, checkPoint, ctrl, latches, true, exceptions);
|
||||
threads[i] = new Updater(stop, checkPoint, ctrl, sync, true, exceptions);
|
||||
}
|
||||
for (int i = numReleasers; i < numReleasers + numStallers; i++) {
|
||||
threads[i] = new Updater(stop, checkPoint, ctrl, latches, false, exceptions);
|
||||
threads[i] = new Updater(stop, checkPoint, ctrl, sync, false, exceptions);
|
||||
|
||||
}
|
||||
for (int i = numReleasers + numStallers; i < numReleasers + numStallers
|
||||
+ numWaiters; i++) {
|
||||
threads[i] = new Waiter(stop, checkPoint, ctrl, latches, exceptions);
|
||||
threads[i] = new Waiter(stop, checkPoint, ctrl, sync, exceptions);
|
||||
|
||||
}
|
||||
|
||||
|
@ -151,7 +148,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
for (int i = 0; i < iters; i++) {
|
||||
if (checkPoint.get()) {
|
||||
|
||||
assertTrue("timed out waiting for update threads - deadlock?", latches[0].await(10, TimeUnit.SECONDS));
|
||||
assertTrue("timed out waiting for update threads - deadlock?", sync.updateJoin.await(10, TimeUnit.SECONDS));
|
||||
if (!exceptions.isEmpty()) {
|
||||
for (Throwable throwable : exceptions) {
|
||||
throwable.printStackTrace();
|
||||
|
@ -159,27 +156,38 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
fail("got exceptions in threads");
|
||||
}
|
||||
|
||||
if (!ctrl.anyStalledThreads()) {
|
||||
assertTrue(
|
||||
"control claims no stalled threads but waiter seems to be blocked",
|
||||
latches[2].await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
checkPoint.set(false);
|
||||
if (ctrl.hasBlocked() && ctrl.isHealthy()) {
|
||||
assertState(numReleasers, numStallers, numWaiters, threads, ctrl);
|
||||
|
||||
|
||||
}
|
||||
|
||||
latches[1].countDown();
|
||||
checkPoint.set(false);
|
||||
sync.waiter.countDown();
|
||||
sync.leftCheckpoint.await();
|
||||
}
|
||||
assertFalse(checkPoint.get());
|
||||
assertEquals(0, sync.waiter.getCount());
|
||||
if (random().nextInt(2) == 0) {
|
||||
latches[0] = new CountDownLatch(numStallers + numReleasers);
|
||||
latches[1] = new CountDownLatch(1);
|
||||
latches[2] = new CountDownLatch(numWaiters);
|
||||
sync.reset(numStallers + numReleasers, numStallers + numReleasers
|
||||
+ numWaiters);
|
||||
checkPoint.set(true);
|
||||
}
|
||||
|
||||
}
|
||||
if (!checkPoint.get()) {
|
||||
sync.reset(numStallers + numReleasers, numStallers + numReleasers
|
||||
+ numWaiters);
|
||||
checkPoint.set(true);
|
||||
}
|
||||
|
||||
assertTrue(sync.updateJoin.await(10, TimeUnit.SECONDS));
|
||||
assertState(numReleasers, numStallers, numWaiters, threads, ctrl);
|
||||
checkPoint.set(false);
|
||||
stop.set(true);
|
||||
latches[1].countDown();
|
||||
sync.waiter.countDown();
|
||||
sync.leftCheckpoint.await();
|
||||
|
||||
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
memCtrl.limit = 1000;
|
||||
|
@ -196,20 +204,45 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertState(int numReleasers, int numStallers, int numWaiters, Thread[] threads, DocumentsWriterStallControl ctrl) throws InterruptedException {
|
||||
int millisToSleep = 100;
|
||||
while (true) {
|
||||
if (ctrl.hasBlocked() && ctrl.isHealthy()) {
|
||||
for (int n = numReleasers + numStallers; n < numReleasers
|
||||
+ numStallers + numWaiters; n++) {
|
||||
if (ctrl.isThreadQueued(threads[n])) {
|
||||
if (millisToSleep < 60000) {
|
||||
Thread.sleep(millisToSleep);
|
||||
millisToSleep *=2;
|
||||
break;
|
||||
} else {
|
||||
fail("control claims no stalled threads but waiter seems to be blocked ");
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Waiter extends Thread {
|
||||
private CountDownLatch[] latches;
|
||||
private Synchonizer sync;
|
||||
private DocumentsWriterStallControl ctrl;
|
||||
private AtomicBoolean checkPoint;
|
||||
private AtomicBoolean stop;
|
||||
private List<Throwable> exceptions;
|
||||
|
||||
public Waiter(AtomicBoolean stop, AtomicBoolean checkPoint,
|
||||
DocumentsWriterStallControl ctrl, CountDownLatch[] latches,
|
||||
DocumentsWriterStallControl ctrl, Synchonizer sync,
|
||||
List<Throwable> exceptions) {
|
||||
super("waiter");
|
||||
this.stop = stop;
|
||||
this.checkPoint = checkPoint;
|
||||
this.ctrl = ctrl;
|
||||
this.latches = latches;
|
||||
this.sync = sync;
|
||||
this.exceptions = exceptions;
|
||||
}
|
||||
|
||||
|
@ -218,13 +251,10 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
while (!stop.get()) {
|
||||
ctrl.waitIfStalled();
|
||||
if (checkPoint.get()) {
|
||||
CountDownLatch join = latches[2];
|
||||
CountDownLatch wait = latches[1];
|
||||
join.countDown();
|
||||
try {
|
||||
assertTrue(wait.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(sync.await());
|
||||
} catch (InterruptedException e) {
|
||||
System.out.println("[Waiter] got interrupted - wait count: " + wait.getCount());
|
||||
System.out.println("[Waiter] got interrupted - wait count: " + sync.waiter.getCount());
|
||||
throw new ThreadInterruptedException(e);
|
||||
}
|
||||
}
|
||||
|
@ -238,7 +268,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
|
||||
public static class Updater extends Thread {
|
||||
|
||||
private CountDownLatch[] latches;
|
||||
private Synchonizer sync;
|
||||
private DocumentsWriterStallControl ctrl;
|
||||
private AtomicBoolean checkPoint;
|
||||
private AtomicBoolean stop;
|
||||
|
@ -246,12 +276,13 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
private List<Throwable> exceptions;
|
||||
|
||||
public Updater(AtomicBoolean stop, AtomicBoolean checkPoint,
|
||||
DocumentsWriterStallControl ctrl, CountDownLatch[] latches,
|
||||
DocumentsWriterStallControl ctrl, Synchonizer sync,
|
||||
boolean release, List<Throwable> exceptions) {
|
||||
super("updater");
|
||||
this.stop = stop;
|
||||
this.checkPoint = checkPoint;
|
||||
this.ctrl = ctrl;
|
||||
this.latches = latches;
|
||||
this.sync = sync;
|
||||
this.release = release;
|
||||
this.exceptions = exceptions;
|
||||
}
|
||||
|
@ -268,22 +299,24 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
ctrl.updateStalled(memCtrl);
|
||||
}
|
||||
if (checkPoint.get()) {
|
||||
CountDownLatch join = latches[0];
|
||||
CountDownLatch wait = latches[1];
|
||||
join.countDown();
|
||||
sync.updateJoin.countDown();
|
||||
try {
|
||||
assertTrue(wait.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(sync.await());
|
||||
} catch (InterruptedException e) {
|
||||
System.out.println("[Updater] got interrupted - wait count: " + wait.getCount());
|
||||
System.out.println("[Updater] got interrupted - wait count: " + sync.waiter.getCount());
|
||||
throw new ThreadInterruptedException(e);
|
||||
}
|
||||
sync.leftCheckpoint.countDown();
|
||||
}
|
||||
if (random().nextBoolean()) {
|
||||
Thread.yield();
|
||||
}
|
||||
Thread.yield();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
exceptions.add(e);
|
||||
}
|
||||
sync.updateJoin.countDown();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -366,4 +399,25 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
private static final class Synchonizer {
|
||||
volatile CountDownLatch waiter;
|
||||
volatile CountDownLatch updateJoin;
|
||||
volatile CountDownLatch leftCheckpoint;
|
||||
|
||||
public Synchonizer(int numUpdater, int numThreads) {
|
||||
reset(numUpdater, numThreads);
|
||||
}
|
||||
|
||||
public void reset(int numUpdaters, int numThreads) {
|
||||
this.waiter = new CountDownLatch(1);
|
||||
this.updateJoin = new CountDownLatch(numUpdaters);
|
||||
this.leftCheckpoint = new CountDownLatch(numUpdaters);
|
||||
}
|
||||
|
||||
public boolean await() throws InterruptedException {
|
||||
return waiter.await(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue