diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java index ea27bf96e05..e522ce10cf4 100644 --- a/common/src/main/java/io/druid/concurrent/Execs.java +++ b/common/src/main/java/io/druid/concurrent/Execs.java @@ -21,10 +21,15 @@ package io.druid.concurrent; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** */ @@ -49,4 +54,31 @@ public class Execs { return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build(); } + + /** + * @param nameFormat nameformat for threadFactory + * @param capacity maximum capacity after which the executorService will block on accepting new tasks + * @return ExecutorService which blocks accepting new tasks when the capacity reached + */ + public static ExecutorService blockingSingleThreaded(String nameFormat, int capacity) + { + return new ThreadPoolExecutor( + 1, 1, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(capacity), makeThreadFactory(nameFormat) + , new RejectedExecutionHandler() + { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) + { + try { + ((ArrayBlockingQueue) executor.getQueue()).put(r); + } + catch (InterruptedException e) { + throw new RejectedExecutionException("Got Interrupted while adding to the Queue"); + } + } + } + ); + } } diff --git a/common/src/test/java/io/druid/concurrent/ExecsTest.java b/common/src/test/java/io/druid/concurrent/ExecsTest.java new file mode 100644 index 00000000000..04e842580dc --- /dev/null +++ b/common/src/test/java/io/druid/concurrent/ExecsTest.java @@ -0,0 +1,64 @@ +package io.druid.concurrent; + +import com.google.common.base.Throwables; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +public class ExecsTest +{ + @Test + public void testBlockingExecutorService() throws Exception + { + final int capacity = 3; + final ExecutorService executorService = Execs.blockingSingleThreaded("test%d", capacity); + final AtomicInteger producedCount = new AtomicInteger(); + final AtomicInteger consumedCount = new AtomicInteger(); + final CyclicBarrier barrier = new CyclicBarrier(2); + Thread producer = new Thread("producer") + { + public void run() + { + for (int i = 0; i < 2 * capacity; i++) { + final int taskID = i; + System.out.println("Produced task"+ taskID); + executorService.submit( + new Runnable() + { + @Override + public void run() + { + System.out.println("Starting task" + taskID); + try { + barrier.await(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + consumedCount.incrementAndGet(); + System.out.println("Completed task" + taskID); + } + } + ); + producedCount.incrementAndGet(); + } + } + }; + producer.start(); + for(int i=0;i