Increase timeout for BlockingPoolTest (#5959)

This commit is contained in:
Jihoon Son 2018-07-06 16:34:53 -07:00 committed by Gian Merlino
parent b3976050ad
commit d1d9358274
1 changed files with 82 additions and 73 deletions

View File

@ -21,7 +21,9 @@ package io.druid.collections;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.junit.AfterClass; import io.druid.java.util.common.concurrent.Execs;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -30,7 +32,6 @@ import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -40,26 +41,34 @@ import static org.junit.Assert.assertTrue;
public class BlockingPoolTest public class BlockingPoolTest
{ {
private static final ExecutorService SERVICE = Executors.newFixedThreadPool(2); private ExecutorService service;
private static final DefaultBlockingPool<Integer> POOL = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 10); private DefaultBlockingPool<Integer> pool;
private static final BlockingPool<Integer> EMPTY_POOL = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 0); private BlockingPool<Integer> emptyPool;
@AfterClass
public static void teardown()
{
SERVICE.shutdown();
}
@Rule @Rule
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
service = Execs.multiThreaded(2, "blocking-pool-test");
pool = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 10);
emptyPool = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 0);
}
@After
public void teardown()
{
service.shutdownNow();
}
@Test @Test
public void testTakeFromEmptyPool() public void testTakeFromEmptyPool()
{ {
expectedException.expect(IllegalStateException.class); expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
EMPTY_POOL.take(0); emptyPool.take(0);
} }
@Test @Test
@ -67,49 +76,49 @@ public class BlockingPoolTest
{ {
expectedException.expect(IllegalStateException.class); expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
EMPTY_POOL.takeBatch(1, 0); emptyPool.takeBatch(1, 0);
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testTake() public void testTake()
{ {
final ReferenceCountingResourceHolder<Integer> holder = POOL.take(100); final ReferenceCountingResourceHolder<Integer> holder = pool.take(100);
assertNotNull(holder); assertNotNull(holder);
assertEquals(9, POOL.getPoolSize()); assertEquals(9, pool.getPoolSize());
holder.close(); holder.close();
assertEquals(10, POOL.getPoolSize()); assertEquals(10, pool.getPoolSize());
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testTakeTimeout() public void testTakeTimeout()
{ {
final List<ReferenceCountingResourceHolder<Integer>> batchHolder = POOL.takeBatch(10, 100L); final List<ReferenceCountingResourceHolder<Integer>> batchHolder = pool.takeBatch(10, 100L);
final ReferenceCountingResourceHolder<Integer> holder = POOL.take(100); final ReferenceCountingResourceHolder<Integer> holder = pool.take(100);
assertNull(holder); assertNull(holder);
batchHolder.forEach(ReferenceCountingResourceHolder::close); batchHolder.forEach(ReferenceCountingResourceHolder::close);
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testTakeBatch() public void testTakeBatch()
{ {
final List<ReferenceCountingResourceHolder<Integer>> holder = POOL.takeBatch(6, 100L); final List<ReferenceCountingResourceHolder<Integer>> holder = pool.takeBatch(6, 100L);
assertNotNull(holder); assertNotNull(holder);
assertEquals(6, holder.size()); assertEquals(6, holder.size());
assertEquals(4, POOL.getPoolSize()); assertEquals(4, pool.getPoolSize());
holder.forEach(ReferenceCountingResourceHolder::close); holder.forEach(ReferenceCountingResourceHolder::close);
assertEquals(10, POOL.getPoolSize()); assertEquals(10, pool.getPoolSize());
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException
{ {
List<ReferenceCountingResourceHolder<Integer>> batchHolder = POOL.takeBatch(10, 10); List<ReferenceCountingResourceHolder<Integer>> batchHolder = pool.takeBatch(10, 10);
assertNotNull(batchHolder); assertNotNull(batchHolder);
assertEquals(10, batchHolder.size()); assertEquals(10, batchHolder.size());
assertEquals(0, POOL.getPoolSize()); assertEquals(0, pool.getPoolSize());
final Future<List<ReferenceCountingResourceHolder<Integer>>> future = SERVICE.submit( final Future<List<ReferenceCountingResourceHolder<Integer>>> future = service.submit(
() -> POOL.takeBatch(8, 100) () -> pool.takeBatch(8, 100)
); );
Thread.sleep(20); Thread.sleep(20);
batchHolder.forEach(ReferenceCountingResourceHolder::close); batchHolder.forEach(ReferenceCountingResourceHolder::close);
@ -117,26 +126,26 @@ public class BlockingPoolTest
batchHolder = future.get(); batchHolder = future.get();
assertNotNull(batchHolder); assertNotNull(batchHolder);
assertEquals(8, batchHolder.size()); assertEquals(8, batchHolder.size());
assertEquals(2, POOL.getPoolSize()); assertEquals(2, pool.getPoolSize());
batchHolder.forEach(ReferenceCountingResourceHolder::close); batchHolder.forEach(ReferenceCountingResourceHolder::close);
assertEquals(10, POOL.getPoolSize()); assertEquals(10, pool.getPoolSize());
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testTakeBatchTooManyObjects() public void testTakeBatchTooManyObjects()
{ {
final List<ReferenceCountingResourceHolder<Integer>> holder = POOL.takeBatch(100, 100L); final List<ReferenceCountingResourceHolder<Integer>> holder = pool.takeBatch(100, 100L);
assertTrue(holder.isEmpty()); assertTrue(holder.isEmpty());
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testConcurrentTake() throws ExecutionException, InterruptedException public void testConcurrentTake() throws ExecutionException, InterruptedException
{ {
final int limit1 = POOL.maxSize() / 2; final int limit1 = pool.maxSize() / 2;
final int limit2 = POOL.maxSize() - limit1 + 1; final int limit2 = pool.maxSize() - limit1 + 1;
final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit( final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = service.submit(
new Callable<List<ReferenceCountingResourceHolder<Integer>>>() new Callable<List<ReferenceCountingResourceHolder<Integer>>>()
{ {
@Override @Override
@ -144,13 +153,13 @@ public class BlockingPoolTest
{ {
List<ReferenceCountingResourceHolder<Integer>> result = Lists.newArrayList(); List<ReferenceCountingResourceHolder<Integer>> result = Lists.newArrayList();
for (int i = 0; i < limit1; i++) { for (int i = 0; i < limit1; i++) {
result.add(POOL.take(10)); result.add(pool.take(10));
} }
return result; return result;
} }
} }
); );
final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit( final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = service.submit(
new Callable<List<ReferenceCountingResourceHolder<Integer>>>() new Callable<List<ReferenceCountingResourceHolder<Integer>>>()
{ {
@Override @Override
@ -158,7 +167,7 @@ public class BlockingPoolTest
{ {
List<ReferenceCountingResourceHolder<Integer>> result = Lists.newArrayList(); List<ReferenceCountingResourceHolder<Integer>> result = Lists.newArrayList();
for (int i = 0; i < limit2; i++) { for (int i = 0; i < limit2; i++) {
result.add(POOL.take(10)); result.add(pool.take(10));
} }
return result; return result;
} }
@ -168,7 +177,7 @@ public class BlockingPoolTest
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get(); final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get(); final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
assertEquals(0, POOL.getPoolSize()); assertEquals(0, pool.getPoolSize());
assertTrue(r1.contains(null) || r2.contains(null)); assertTrue(r1.contains(null) || r2.contains(null));
int nonNullCount = 0; int nonNullCount = 0;
@ -183,9 +192,9 @@ public class BlockingPoolTest
nonNullCount++; nonNullCount++;
} }
} }
assertEquals(POOL.maxSize(), nonNullCount); assertEquals(pool.maxSize(), nonNullCount);
final Future future1 = SERVICE.submit(new Runnable() final Future future1 = service.submit(new Runnable()
{ {
@Override @Override
public void run() public void run()
@ -197,7 +206,7 @@ public class BlockingPoolTest
} }
} }
}); });
final Future future2 = SERVICE.submit(new Runnable() final Future future2 = service.submit(new Runnable()
{ {
@Override @Override
public void run() public void run()
@ -213,50 +222,50 @@ public class BlockingPoolTest
future1.get(); future1.get();
future2.get(); future2.get();
assertEquals(POOL.maxSize(), POOL.getPoolSize()); assertEquals(pool.maxSize(), pool.getPoolSize());
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException
{ {
final int batch1 = POOL.maxSize() / 2; final int batch1 = pool.maxSize() / 2;
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> POOL.takeBatch(batch1, 10); final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> pool.takeBatch(batch1, 10);
final int batch2 = POOL.maxSize() - batch1 + 1; final int batch2 = pool.maxSize() - batch1 + 1;
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(batch2, 10); final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> pool.takeBatch(batch2, 10);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit(c1); final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = service.submit(c1);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2); final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = service.submit(c2);
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get(); final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get(); final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
if (r1 != null) { if (r1 != null) {
assertTrue(r2.isEmpty()); assertTrue(r2.isEmpty());
assertEquals(POOL.maxSize() - batch1, POOL.getPoolSize()); assertEquals(pool.maxSize() - batch1, pool.getPoolSize());
assertEquals(batch1, r1.size()); assertEquals(batch1, r1.size());
r1.forEach(ReferenceCountingResourceHolder::close); r1.forEach(ReferenceCountingResourceHolder::close);
} else { } else {
assertNotNull(r2); assertNotNull(r2);
assertEquals(POOL.maxSize() - batch2, POOL.getPoolSize()); assertEquals(pool.maxSize() - batch2, pool.getPoolSize());
assertEquals(batch2, r2.size()); assertEquals(batch2, r2.size());
r2.forEach(ReferenceCountingResourceHolder::close); r2.forEach(ReferenceCountingResourceHolder::close);
} }
assertEquals(POOL.maxSize(), POOL.getPoolSize()); assertEquals(pool.maxSize(), pool.getPoolSize());
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testConcurrentBatchClose() throws ExecutionException, InterruptedException public void testConcurrentBatchClose() throws ExecutionException, InterruptedException
{ {
final int batch1 = POOL.maxSize() / 2; final int batch1 = pool.maxSize() / 2;
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> POOL.takeBatch(batch1, 10); final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> pool.takeBatch(batch1, 10);
final int batch2 = POOL.maxSize() - batch1; final int batch2 = pool.maxSize() - batch1;
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(batch2, 10); final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> pool.takeBatch(batch2, 10);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit(c1); final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = service.submit(c1);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2); final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = service.submit(c2);
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get(); final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get(); final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
@ -265,9 +274,9 @@ public class BlockingPoolTest
assertNotNull(r2); assertNotNull(r2);
assertEquals(batch1, r1.size()); assertEquals(batch1, r1.size());
assertEquals(batch2, r2.size()); assertEquals(batch2, r2.size());
assertEquals(0, POOL.getPoolSize()); assertEquals(0, pool.getPoolSize());
final Future future1 = SERVICE.submit(new Runnable() final Future future1 = service.submit(new Runnable()
{ {
@Override @Override
public void run() public void run()
@ -275,7 +284,7 @@ public class BlockingPoolTest
r1.forEach(ReferenceCountingResourceHolder::close); r1.forEach(ReferenceCountingResourceHolder::close);
} }
}); });
final Future future2 = SERVICE.submit(new Runnable() final Future future2 = service.submit(new Runnable()
{ {
@Override @Override
public void run() public void run()
@ -287,18 +296,18 @@ public class BlockingPoolTest
future1.get(); future1.get();
future2.get(); future2.get();
assertEquals(POOL.maxSize(), POOL.getPoolSize()); assertEquals(pool.maxSize(), pool.getPoolSize());
} }
@Test(timeout = 1000) @Test(timeout = 5000)
public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException
{ {
final List<ReferenceCountingResourceHolder<Integer>> r1 = POOL.takeBatch(1, 10); final List<ReferenceCountingResourceHolder<Integer>> r1 = pool.takeBatch(1, 10);
final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(10, 100); final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> pool.takeBatch(10, 100);
final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2); final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = service.submit(c2);
final Future f1 = SERVICE.submit(new Runnable() final Future f1 = service.submit(new Runnable()
{ {
@Override @Override
public void run() public void run()
@ -317,9 +326,9 @@ public class BlockingPoolTest
f1.get(); f1.get();
assertNotNull(r2); assertNotNull(r2);
assertEquals(10, r2.size()); assertEquals(10, r2.size());
assertEquals(0, POOL.getPoolSize()); assertEquals(0, pool.getPoolSize());
r2.forEach(ReferenceCountingResourceHolder::close); r2.forEach(ReferenceCountingResourceHolder::close);
assertEquals(POOL.maxSize(), POOL.getPoolSize()); assertEquals(pool.maxSize(), pool.getPoolSize());
} }
} }