diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java index 308208ef98d..66af2a196ba 100644 --- a/common/src/main/java/io/druid/concurrent/Execs.java +++ b/common/src/main/java/io/druid/concurrent/Execs.java @@ -22,11 +22,13 @@ package io.druid.concurrent; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -60,25 +62,29 @@ public class Execs * @param capacity maximum capacity after which the executorService will block on accepting new tasks * @return ExecutorService which blocks accepting new tasks when the capacity reached */ - public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity) + public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity) { - return new ThreadPoolExecutor( - 1, 1, - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(capacity), makeThreadFactory(nameFormat) - , new RejectedExecutionHandler() - { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) - { - try { - ((ArrayBlockingQueue) executor.getQueue()).put(r); - } - catch (InterruptedException e) { - throw new RejectedExecutionException("Got Interrupted while adding to the Queue"); - } - } + final BlockingQueue queue; + if (capacity > 0) { + queue = new ArrayBlockingQueue<>(capacity); + } else { + queue = new SynchronousQueue<>(); } + return new ThreadPoolExecutor( + 1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat), + new RejectedExecutionHandler() + { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) + { + try { + executor.getQueue().put(r); + } + catch (InterruptedException e) { + throw new RejectedExecutionException("Got Interrupted while adding to the Queue"); + } + } + } ); } } diff --git a/common/src/test/java/io/druid/concurrent/ExecsTest.java b/common/src/test/java/io/druid/concurrent/ExecsTest.java index 809ed5eac02..ae2b0e15473 100644 --- a/common/src/test/java/io/druid/concurrent/ExecsTest.java +++ b/common/src/test/java/io/druid/concurrent/ExecsTest.java @@ -20,6 +20,8 @@ package io.druid.concurrent; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.metamx.common.logger.Logger; import org.junit.Assert; import org.junit.Test; @@ -30,23 +32,46 @@ import java.util.concurrent.atomic.AtomicInteger; public class ExecsTest { + private static final Logger log = new Logger(ExecsTest.class); + @Test - public void testBlockingExecutorService() throws Exception + public void testBlockingExecutorServiceZeroCapacity() throws Exception { - final int capacity = 3; - final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity); - final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1); - final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity); + runTest(0); + } + + @Test + public void testBlockingExecutorServiceOneCapacity() throws Exception + { + runTest(1); + } + + @Test + public void testBlockingExecutorServiceThreeCapacity() throws Exception + { + runTest(3); + } + + private static void runTest(final int capacity) throws Exception + { + final int nTasks = (capacity + 1) * 3; + final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("ExecsTest-Blocking-%d", capacity); + final CountDownLatch queueShouldBeFullSignal = new CountDownLatch(capacity + 1); + final CountDownLatch taskCompletedSignal = new CountDownLatch(nTasks); final CountDownLatch taskStartSignal = new CountDownLatch(1); final AtomicInteger producedCount = new AtomicInteger(); final AtomicInteger consumedCount = new AtomicInteger(); - ExecutorService producer = Executors.newSingleThreadExecutor(); + final ExecutorService producer = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat( + "ExecsTest-Producer-%d" + ).build() + ); producer.submit( new Runnable() { public void run() { - for (int i = 0; i < 2 * capacity; i++) { + for (int i = 0; i < nTasks; i++) { final int taskID = i; System.out.println("Produced task" + taskID); blockingExecutor.submit( @@ -55,7 +80,7 @@ public class ExecsTest @Override public void run() { - System.out.println("Starting task" + taskID); + log.info("Starting task: %s", taskID); try { taskStartSignal.await(); consumedCount.incrementAndGet(); @@ -64,29 +89,31 @@ public class ExecsTest catch (Exception e) { throw Throwables.propagate(e); } - System.out.println("Completed task" + taskID); + log.info("Completed task: %s", taskID); } } ); producedCount.incrementAndGet(); - queueFullSignal.countDown(); + queueShouldBeFullSignal.countDown(); } } } ); - queueFullSignal.await(); - // verify that the producer blocks + queueShouldBeFullSignal.await(); + // Verify that the producer blocks. I don't think it's possible to be sure that the producer is blocking (since + // it could be doing nothing for any reason). But waiting a short period of time and checking that it hasn't done + // anything should hopefully be sufficient. + Thread.sleep(500); Assert.assertEquals(capacity + 1, producedCount.get()); // let the tasks run taskStartSignal.countDown(); // wait until all tasks complete taskCompletedSignal.await(); // verify all tasks consumed - Assert.assertEquals(2 * capacity, consumedCount.get()); + Assert.assertEquals(nTasks, consumedCount.get()); // cleanup blockingExecutor.shutdown(); producer.shutdown(); - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 7a40035c3e6..09172da3a4d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -109,7 +109,7 @@ public class RealtimeIndexTask extends AbstractTask @JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, @JsonProperty("windowPeriod") Period windowPeriod, - @JsonProperty("maxPendingPersists") int maxPendingPersists, + @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity, @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory ) @@ -139,7 +139,7 @@ public class RealtimeIndexTask extends AbstractTask this.firehoseFactory = firehoseFactory; this.fireDepartmentConfig = fireDepartmentConfig; this.windowPeriod = windowPeriod; - this.maxPendingPersists = (maxPendingPersists == 0) + this.maxPendingPersists = (maxPendingPersists == null) ? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.segmentGranularity = segmentGranularity; @@ -398,6 +398,12 @@ public class RealtimeIndexTask extends AbstractTask return windowPeriod; } + @JsonProperty + public int getMaxPendingPersists() + { + return maxPendingPersists; + } + @JsonProperty public IndexGranularity getSegmentGranularity() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index b75169c8b9a..ba5afb04d3b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -198,7 +198,7 @@ public class TaskSerdeTest null, null, new Period("PT10M"), - 1, + 5, IndexGranularity.HOUR, null ); @@ -214,6 +214,7 @@ public class TaskSerdeTest Assert.assertEquals("rofl", task.getTaskResource().getAvailabilityGroup()); Assert.assertEquals(new Period("PT10M"), task.getWindowPeriod()); Assert.assertEquals(IndexGranularity.HOUR, task.getSegmentGranularity()); + Assert.assertEquals(5, task.getMaxPendingPersists()); Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); @@ -222,6 +223,7 @@ public class TaskSerdeTest Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup()); Assert.assertEquals(task.getWindowPeriod(), task2.getWindowPeriod()); Assert.assertEquals(task.getSegmentGranularity(), task2.getSegmentGranularity()); + Assert.assertEquals(task.getMaxPendingPersists(), task2.getMaxPendingPersists()); } @Test diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index f7d6398a194..4a8332137d4 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -44,7 +44,7 @@ import java.util.concurrent.ExecutorService; */ public class RealtimePlumberSchool implements PlumberSchool { - public static final int DEFAULT_MAX_PENDING_PERSISTS = 2; + public static final int DEFAULT_MAX_PENDING_PERSISTS = 0; private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);