diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java index e522ce10cf4..308208ef98d 100644 --- a/common/src/main/java/io/druid/concurrent/Execs.java +++ b/common/src/main/java/io/druid/concurrent/Execs.java @@ -60,7 +60,7 @@ public class Execs * @param capacity maximum capacity after which the executorService will block on accepting new tasks * @return ExecutorService which blocks accepting new tasks when the capacity reached */ - public static ExecutorService blockingSingleThreaded(String nameFormat, int capacity) + public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity) { return new ThreadPoolExecutor( 1, 1, diff --git a/common/src/test/java/io/druid/concurrent/ExecsTest.java b/common/src/test/java/io/druid/concurrent/ExecsTest.java index 04e842580dc..6d4452269ee 100644 --- a/common/src/test/java/io/druid/concurrent/ExecsTest.java +++ b/common/src/test/java/io/druid/concurrent/ExecsTest.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package io.druid.concurrent; import com.google.common.base.Throwables; @@ -5,7 +24,9 @@ import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public class ExecsTest @@ -14,50 +35,59 @@ public class ExecsTest public void testBlockingExecutorService() throws Exception { final int capacity = 3; - final ExecutorService executorService = Execs.blockingSingleThreaded("test%d", capacity); + final ExecutorService executorService = Execs.newBlockingSingleThreaded("test%d", capacity); final AtomicInteger producedCount = new AtomicInteger(); final AtomicInteger consumedCount = new AtomicInteger(); final CyclicBarrier barrier = new CyclicBarrier(2); - Thread producer = new Thread("producer") - { - public void run() - { - for (int i = 0; i < 2 * capacity; i++) { - final int taskID = i; - System.out.println("Produced task"+ taskID); - executorService.submit( - new Runnable() - { - @Override - public void run() - { - System.out.println("Starting task" + taskID); - try { - barrier.await(); + ExecutorService producer = Executors.newCachedThreadPool(); + + producer.submit( + new Runnable() + { + public void run() + { + for (int i = 0; i < 2 * capacity; i++) { + final int taskID = i; + System.out.println("Produced task" + taskID); + executorService.submit( + new Runnable() + { + @Override + public void run() + { + System.out.println("Starting task" + taskID); + try { + consumedCount.incrementAndGet(); + barrier.await(); //1 + barrier.await(); //2 + } + catch (Exception e) { + throw Throwables.propagate(e); + } + System.out.println("Completed task" + taskID); + } } - catch (Exception e) { - throw Throwables.propagate(e); - } - consumedCount.incrementAndGet(); - System.out.println("Completed task" + taskID); - } - } - ); - producedCount.incrementAndGet(); + ); + producedCount.incrementAndGet(); + } + } } - } - }; - producer.start(); + ); + while(producedCount.get() < capacity ){ + Thread.sleep(5); + } + for(int i=0;i