Use CyclicBarriers for sychronizing driver and test threads

This commit modifies some tests to use CyclicBarriers to correctly and
simply sychronize driver and test threads.
This commit is contained in:
Jason Tedor 2016-01-06 15:07:05 -05:00
parent 2cc979ee77
commit 4c0f5bda47
2 changed files with 55 additions and 69 deletions

View File

@ -886,7 +886,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
} }
} }
public void testClusterStateBatchedUpdates() throws InterruptedException { public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("discovery.type", "local") .put("discovery.type", "local")
.build(); .build();
@ -974,19 +974,12 @@ public class ClusterServiceIT extends ESIntegTestCase {
counts.merge(executor, 1, (previous, one) -> previous + one); counts.merge(executor, 1, (previous, one) -> previous + one);
} }
CountDownLatch startGate = new CountDownLatch(1); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
CountDownLatch endGate = new CountDownLatch(numberOfThreads);
AtomicBoolean interrupted = new AtomicBoolean();
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
final int index = i; final int index = i;
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
try { try {
try { barrier.await();
startGate.await();
} catch (InterruptedException e) {
interrupted.set(true);
return;
}
for (int j = 0; j < tasksSubmittedPerThread; j++) { for (int j = 0; j < tasksSubmittedPerThread; j++) {
ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j); ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j);
clusterService.submitStateUpdateTask( clusterService.submitStateUpdateTask(
@ -996,16 +989,18 @@ public class ClusterServiceIT extends ESIntegTestCase {
executor, executor,
listener); listener);
} }
} finally { barrier.await();
endGate.countDown(); } catch (BrokenBarrierException | InterruptedException e) {
throw new AssertionError(e);
} }
}); });
thread.start(); thread.start();
} }
startGate.countDown(); // wait for all threads to be ready
endGate.await(); barrier.await();
assertFalse(interrupted.get()); // wait for all threads to finish
barrier.await();
// wait until all the cluster state updates have been processed // wait until all the cluster state updates have been processed
updateLatch.await(); updateLatch.await();

View File

@ -31,7 +31,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -491,25 +493,19 @@ public class CacheTests extends ESTestCase {
} }
} }
public void testComputeIfAbsentCallsOnce() throws InterruptedException { public void testComputeIfAbsentCallsOnce() throws BrokenBarrierException, InterruptedException {
int numberOfThreads = randomIntBetween(2, 32); int numberOfThreads = randomIntBetween(2, 32);
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build(); final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
AtomicReferenceArray flags = new AtomicReferenceArray(numberOfEntries); AtomicReferenceArray flags = new AtomicReferenceArray(numberOfEntries);
for (int j = 0; j < numberOfEntries; j++) { for (int j = 0; j < numberOfEntries; j++) {
flags.set(j, false); flags.set(j, false);
} }
CountDownLatch startGate = new CountDownLatch(1);
CountDownLatch endGate = new CountDownLatch(numberOfThreads); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
AtomicBoolean interrupted = new AtomicBoolean();
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
try { try {
try { barrier.await();
startGate.await();
} catch (InterruptedException e) {
interrupted.set(true);
return;
}
for (int j = 0; j < numberOfEntries; j++) { for (int j = 0; j < numberOfEntries; j++) {
try { try {
cache.computeIfAbsent(j, key -> { cache.computeIfAbsent(j, key -> {
@ -517,18 +513,21 @@ public class CacheTests extends ESTestCase {
return Integer.toString(key); return Integer.toString(key);
}); });
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new RuntimeException(e); fail(e.getMessage());
} }
} }
} finally { barrier.await();
endGate.countDown(); } catch (BrokenBarrierException | InterruptedException e) {
throw new AssertionError(e);
} }
}); });
thread.start(); thread.start();
} }
startGate.countDown();
endGate.await(); // wait for all threads to be ready
assertFalse(interrupted.get()); barrier.await();
// wait for all threads to finish
barrier.await();
} }
public void testComputeIfAbsentThrowsExceptionIfLoaderReturnsANullValue() { public void testComputeIfAbsentThrowsExceptionIfLoaderReturnsANullValue() {
@ -541,7 +540,7 @@ public class CacheTests extends ESTestCase {
} }
} }
public void testDependentKeyDeadlock() throws InterruptedException { public void testDependentKeyDeadlock() throws BrokenBarrierException, InterruptedException {
class Key { class Key {
private final int key; private final int key;
@ -568,18 +567,17 @@ public class CacheTests extends ESTestCase {
int numberOfThreads = randomIntBetween(2, 32); int numberOfThreads = randomIntBetween(2, 32);
final Cache<Key, Integer> cache = CacheBuilder.<Key, Integer>builder().build(); final Cache<Key, Integer> cache = CacheBuilder.<Key, Integer>builder().build();
CountDownLatch startGate = new CountDownLatch(1);
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads); CountDownLatch deadlockLatch = new CountDownLatch(numberOfThreads);
AtomicBoolean interrupted = new AtomicBoolean();
List<Thread> threads = new ArrayList<>(); List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
try { try {
try { try {
startGate.await(); barrier.await();
} catch (InterruptedException e) { } catch (BrokenBarrierException | InterruptedException e) {
interrupted.set(true); throw new AssertionError(e);
return;
} }
Random random = new Random(random().nextLong()); Random random = new Random(random().nextLong());
for (int j = 0; j < numberOfEntries; j++) { for (int j = 0; j < numberOfEntries; j++) {
@ -631,7 +629,7 @@ public class CacheTests extends ESTestCase {
}, 1, 1, TimeUnit.SECONDS); }, 1, 1, TimeUnit.SECONDS);
// everything is setup, release the hounds // everything is setup, release the hounds
startGate.countDown(); barrier.await();
// wait for either deadlock to be detected or the threads to terminate // wait for either deadlock to be detected or the threads to terminate
deadlockLatch.await(); deadlockLatch.await();
@ -642,21 +640,16 @@ public class CacheTests extends ESTestCase {
assertFalse("deadlock", deadlock.get()); assertFalse("deadlock", deadlock.get());
} }
public void testCachePollution() throws InterruptedException { public void testCachePollution() throws BrokenBarrierException, InterruptedException {
int numberOfThreads = randomIntBetween(2, 32); int numberOfThreads = randomIntBetween(2, 32);
final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build(); final Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
CountDownLatch startGate = new CountDownLatch(1);
CountDownLatch endGate = new CountDownLatch(numberOfThreads); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
AtomicBoolean interrupted = new AtomicBoolean();
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
try { try {
try { barrier.await();
startGate.await();
} catch (InterruptedException e) {
interrupted.set(true);
return;
}
Random random = new Random(random().nextLong()); Random random = new Random(random().nextLong());
for (int j = 0; j < numberOfEntries; j++) { for (int j = 0; j < numberOfEntries; j++) {
Integer key = random.nextInt(numberOfEntries); Integer key = random.nextInt(numberOfEntries);
@ -686,21 +679,23 @@ public class CacheTests extends ESTestCase {
cache.get(key); cache.get(key);
} }
} }
} finally { barrier.await();
endGate.countDown(); } catch (BrokenBarrierException | InterruptedException e) {
throw new AssertionError(e);
} }
}); });
thread.start(); thread.start();
} }
startGate.countDown(); // wait for all threads to be ready
endGate.await(); barrier.await();
assertFalse(interrupted.get()); // wait for all threads to finish
barrier.await();
} }
// test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key // test that the cache is not corrupted under lots of concurrent modifications, even hitting the same key
// here be dragons: this test did catch one subtle bug during development; do not remove lightly // here be dragons: this test did catch one subtle bug during development; do not remove lightly
public void testTorture() throws InterruptedException { public void testTorture() throws BrokenBarrierException, InterruptedException {
int numberOfThreads = randomIntBetween(2, 32); int numberOfThreads = randomIntBetween(2, 32);
final Cache<Integer, String> cache = final Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder() CacheBuilder.<Integer, String>builder()
@ -708,32 +703,28 @@ public class CacheTests extends ESTestCase {
.weigher((k, v) -> 2) .weigher((k, v) -> 2)
.build(); .build();
CountDownLatch startGate = new CountDownLatch(1); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
CountDownLatch endGate = new CountDownLatch(numberOfThreads);
AtomicBoolean interrupted = new AtomicBoolean();
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
try { try {
try { barrier.await();
startGate.await();
} catch (InterruptedException e) {
interrupted.set(true);
return;
}
Random random = new Random(random().nextLong()); Random random = new Random(random().nextLong());
for (int j = 0; j < numberOfEntries; j++) { for (int j = 0; j < numberOfEntries; j++) {
Integer key = random.nextInt(numberOfEntries); Integer key = random.nextInt(numberOfEntries);
cache.put(key, Integer.toString(j)); cache.put(key, Integer.toString(j));
} }
} finally { barrier.await();
endGate.countDown(); } catch (BrokenBarrierException | InterruptedException e) {
throw new AssertionError(e);
} }
}); });
thread.start(); thread.start();
} }
startGate.countDown();
endGate.await(); // wait for all threads to be ready
assertFalse(interrupted.get()); barrier.await();
// wait for all threads to finish
barrier.await();
cache.refresh(); cache.refresh();
assertEquals(500, cache.count()); assertEquals(500, cache.count());