Merge pull request #983 from druid-io/CompressionStrategyTestConcurencyExpansion

Lengthen CompressionStrategyTest::testKnownSizeConcurrency() timeout
This commit is contained in:
xvrl 2014-12-19 17:01:46 -08:00
commit 500c809bbe
1 changed files with 13 additions and 17 deletions

View File

@ -21,6 +21,10 @@ package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.segment.CompressedPools;
import org.junit.Assert;
import org.junit.Before;
@ -35,11 +39,13 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -158,26 +164,20 @@ public class CompressionStrategyTest
}
@Test
@Test(timeout = 120000)
public void testKnownSizeConcurrency() throws InterruptedException, ExecutionException, TimeoutException
{
final int numThreads = 20;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(numThreads);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
numThreads,
numThreads,
100,
TimeUnit.MILLISECONDS,
queue
);
Collection<Future<Boolean>> results = new ArrayList<>();
ListeningExecutorService threadPoolExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads));
List<ListenableFuture<?>> results = new ArrayList<>();
for (int i = 0; i < numThreads; ++i) {
results.add(
threadPoolExecutor.submit(
new Callable<Boolean>()
new Runnable()
{
@Override
public Boolean call() throws Exception
public void run()
{
ByteBuffer compressed = ByteBuffer.wrap(compressionStrategy.getCompressor().compress(originalData));
ByteBuffer output = ByteBuffer.allocate(originalData.length);
@ -187,15 +187,11 @@ public class CompressionStrategyTest
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
return true;
}
}
)
);
}
threadPoolExecutor.shutdown();
for (Future result : results) {
Assert.assertTrue((Boolean) result.get(500, TimeUnit.MILLISECONDS));
}
Futures.allAsList(results).get();
}
}