diff --git a/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java b/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java index 3a261a7083f..8c7ce610679 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressionStrategyTest.java @@ -21,6 +21,10 @@ package io.druid.segment.data; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.segment.CompressedPools; import org.junit.Assert; import org.junit.Before; @@ -35,11 +39,13 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -158,26 +164,20 @@ public class CompressionStrategyTest } - @Test + @Test(timeout = 120000) public void testKnownSizeConcurrency() throws InterruptedException, ExecutionException, TimeoutException { final int numThreads = 20; - BlockingQueue queue = new ArrayBlockingQueue<>(numThreads); - ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( - numThreads, - numThreads, - 100, - TimeUnit.MILLISECONDS, - queue - ); - Collection> results = new ArrayList<>(); + + ListeningExecutorService threadPoolExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads)); + List> results = new ArrayList<>(); for (int i = 0; i < numThreads; ++i) { results.add( threadPoolExecutor.submit( - new Callable() + new Runnable() { @Override - public Boolean call() throws Exception + public void run() { ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData)); ByteBuffer output = ByteBuffer.allocate(originalData.length); @@ -187,15 +187,11 @@ public class CompressionStrategyTest byte[] checkArray = new byte[DATA_SIZER]; output.get(checkArray); Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray); - return true; } } ) ); } - threadPoolExecutor.shutdown(); - for (Future result : results) { - Assert.assertTrue((Boolean) result.get(500, TimeUnit.MILLISECONDS)); - } + Futures.allAsList(results).get(); } }