Fix occasionally hanging test moving away from timeouts.

Fixes EsExecutorTests to use latches and a busy wait util from
ElasticsearchTestCase. This commit also adds some minor randomization
to the test.
This commit is contained in:
Simon Willnauer 2013-08-05 09:46:26 +02:00
parent 094c10d62d
commit 539ffb9ef5
1 changed files with 34 additions and 21 deletions

View File

@ -19,35 +19,43 @@
package org.elasticsearch.test.unit.common.util.concurrent;
import com.google.common.base.Predicate;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadBarrier;
import org.elasticsearch.test.integration.ElasticsearchTestCase;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
/**
*/
public class EsExecutorsTests {
public class EsExecutorsTests extends ElasticsearchTestCase {
private TimeUnit randomTimeUnit() {
return TimeUnit.values()[between(0, TimeUnit.values().length-1)];
}
@Test
public void testScaleUp() throws Exception {
final int min = 2;
final int max = 4;
final int min = between(1, 3);
final int max = between(min+1, 6);
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, 100, TimeUnit.DAYS, EsExecutors.daemonThreadFactory("test"));
ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test"));
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
for (int i = 0; i < max; ++i) {
final CountDownLatch latch = new CountDownLatch(1);
pool.execute(new Runnable() {
public void run() {
latch.countDown();
try {
barrier.await();
barrier.await();
@ -59,7 +67,7 @@ public class EsExecutorsTests {
//wait until thread executes this task
//otherwise, a task might be queued
Thread.sleep(100);
latch.await();
}
barrier.await();
@ -71,17 +79,19 @@ public class EsExecutorsTests {
@Test
public void testScaleDown() throws Exception {
final int min = 2;
final int max = 4;
final int min = between(1, 3);
final int max = between(min+1, 6);
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, 10, TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"));
final ThreadPoolExecutor pool = EsExecutors.newScalingExecutorService(min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"));
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
for (int i = 0; i < max; ++i) {
final CountDownLatch latch = new CountDownLatch(1);
pool.execute(new Runnable() {
public void run() {
latch.countDown();
try {
barrier.await();
barrier.await();
@ -93,18 +103,21 @@ public class EsExecutorsTests {
//wait until thread executes this task
//otherwise, a task might be queued
Thread.sleep(100);
latch.await();
}
barrier.await();
assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
barrier.await();
Thread.sleep(1000);
// assertThat("not all tasks completed", pool.getCompletedTaskCount(), equalTo((long) max));
awaitBusy(new Predicate<Object>() {
public boolean apply(Object o) {
return pool.getActiveCount() == 0 && pool.getPoolSize() < max;
}
});
//assertThat("not all tasks completed", pool.getCompletedTaskCount(), equalTo((long) max));
assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
//Assert.assertEquals("wrong pool size. ", min, pool.getPoolSize()); //BUG in ThreadPool - Bug ID: 6458662
//assertThat("wrong pool size. ", min, equalTo(pool.getPoolSize())); //BUG in ThreadPool - Bug ID: 6458662
//assertThat("idle threads didn't stay above min (" + pool.getPoolSize() + ")", pool.getPoolSize(), greaterThan(0));
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
pool.shutdown();
@ -113,18 +126,19 @@ public class EsExecutorsTests {
@Test
public void testBlocking() throws Exception {
final int min = 2;
final int max = 4;
final long waitTime = 1000; //1 second
final int min = between(1, 3);
final int max = between(min+1, 6);
final long waitTime = between(1000, 2000); //1 second
final ThreadBarrier barrier = new ThreadBarrier(max + 1);
ThreadPoolExecutor pool = EsExecutors.newBlockingExecutorService(min, max, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), 1, waitTime, TimeUnit.MILLISECONDS);
ThreadPoolExecutor pool = EsExecutors.newBlockingExecutorService(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test"), 1, waitTime, TimeUnit.MILLISECONDS);
assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
for (int i = 0; i < max; ++i) {
final CountDownLatch latch = new CountDownLatch(1);
pool.execute(new Runnable() {
public void run() {
latch.countDown();
try {
barrier.await();
barrier.await();
@ -133,10 +147,9 @@ public class EsExecutorsTests {
}
}
});
//wait until thread executes this task
//otherwise, a task might be queued
Thread.sleep(100);
latch.await();
}
barrier.await();