Blocking Executors and maxPendingPersists, oh my!

- Execs.newBlockingSingleThreaded can now accept capacity = 0.
- Changed default maxPendingPersists from 2 to 0.
- Fixed serde of maxPendingPersists in RealtimeIndexTasks.
This commit is contained in:
Gian Merlino 2014-03-05 10:55:12 -08:00
parent 3691d40240
commit 70db460f97
5 changed files with 76 additions and 35 deletions

View File

@ -22,11 +22,13 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -60,25 +62,29 @@ public class Execs
* @param capacity maximum capacity after which the executorService will block on accepting new tasks * @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached * @return ExecutorService which blocks accepting new tasks when the capacity reached
*/ */
public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity) public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity)
{ {
return new ThreadPoolExecutor( final BlockingQueue<Runnable> queue;
1, 1, if (capacity > 0) {
0L, TimeUnit.MILLISECONDS, queue = new ArrayBlockingQueue<>(capacity);
new ArrayBlockingQueue<Runnable>(capacity), makeThreadFactory(nameFormat) } else {
, new RejectedExecutionHandler() queue = new SynchronousQueue<>();
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
((ArrayBlockingQueue) executor.getQueue()).put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
} }
return new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat),
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
executor.getQueue().put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
); );
} }
} }

View File

@ -20,6 +20,8 @@
package io.druid.concurrent; package io.druid.concurrent;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.logger.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -30,23 +32,46 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest public class ExecsTest
{ {
private static final Logger log = new Logger(ExecsTest.class);
@Test @Test
public void testBlockingExecutorService() throws Exception public void testBlockingExecutorServiceZeroCapacity() throws Exception
{ {
final int capacity = 3; runTest(0);
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity); }
final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity); @Test
public void testBlockingExecutorServiceOneCapacity() throws Exception
{
runTest(1);
}
@Test
public void testBlockingExecutorServiceThreeCapacity() throws Exception
{
runTest(3);
}
private static void runTest(final int capacity) throws Exception
{
final int nTasks = (capacity + 1) * 3;
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("ExecsTest-Blocking-%d", capacity);
final CountDownLatch queueShouldBeFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(nTasks);
final CountDownLatch taskStartSignal = new CountDownLatch(1); final CountDownLatch taskStartSignal = new CountDownLatch(1);
final AtomicInteger producedCount = new AtomicInteger(); final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger(); final AtomicInteger consumedCount = new AtomicInteger();
ExecutorService producer = Executors.newSingleThreadExecutor(); final ExecutorService producer = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"ExecsTest-Producer-%d"
).build()
);
producer.submit( producer.submit(
new Runnable() new Runnable()
{ {
public void run() public void run()
{ {
for (int i = 0; i < 2 * capacity; i++) { for (int i = 0; i < nTasks; i++) {
final int taskID = i; final int taskID = i;
System.out.println("Produced task" + taskID); System.out.println("Produced task" + taskID);
blockingExecutor.submit( blockingExecutor.submit(
@ -55,7 +80,7 @@ public class ExecsTest
@Override @Override
public void run() public void run()
{ {
System.out.println("Starting task" + taskID); log.info("Starting task: %s", taskID);
try { try {
taskStartSignal.await(); taskStartSignal.await();
consumedCount.incrementAndGet(); consumedCount.incrementAndGet();
@ -64,29 +89,31 @@ public class ExecsTest
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
System.out.println("Completed task" + taskID); log.info("Completed task: %s", taskID);
} }
} }
); );
producedCount.incrementAndGet(); producedCount.incrementAndGet();
queueFullSignal.countDown(); queueShouldBeFullSignal.countDown();
} }
} }
} }
); );
queueFullSignal.await(); queueShouldBeFullSignal.await();
// verify that the producer blocks // Verify that the producer blocks. I don't think it's possible to be sure that the producer is blocking (since
// it could be doing nothing for any reason). But waiting a short period of time and checking that it hasn't done
// anything should hopefully be sufficient.
Thread.sleep(500);
Assert.assertEquals(capacity + 1, producedCount.get()); Assert.assertEquals(capacity + 1, producedCount.get());
// let the tasks run // let the tasks run
taskStartSignal.countDown(); taskStartSignal.countDown();
// wait until all tasks complete // wait until all tasks complete
taskCompletedSignal.await(); taskCompletedSignal.await();
// verify all tasks consumed // verify all tasks consumed
Assert.assertEquals(2 * capacity, consumedCount.get()); Assert.assertEquals(nTasks, consumedCount.get());
// cleanup // cleanup
blockingExecutor.shutdown(); blockingExecutor.shutdown();
producer.shutdown(); producer.shutdown();
} }
} }

View File

@ -109,7 +109,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists, @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity, @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
) )
@ -139,7 +139,7 @@ public class RealtimeIndexTask extends AbstractTask
this.firehoseFactory = firehoseFactory; this.firehoseFactory = firehoseFactory;
this.fireDepartmentConfig = fireDepartmentConfig; this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod; this.windowPeriod = windowPeriod;
this.maxPendingPersists = (maxPendingPersists == 0) this.maxPendingPersists = (maxPendingPersists == null)
? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS ? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS
: maxPendingPersists; : maxPendingPersists;
this.segmentGranularity = segmentGranularity; this.segmentGranularity = segmentGranularity;
@ -398,6 +398,12 @@ public class RealtimeIndexTask extends AbstractTask
return windowPeriod; return windowPeriod;
} }
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
@JsonProperty @JsonProperty
public IndexGranularity getSegmentGranularity() public IndexGranularity getSegmentGranularity()
{ {

View File

@ -198,7 +198,7 @@ public class TaskSerdeTest
null, null,
null, null,
new Period("PT10M"), new Period("PT10M"),
1, 5,
IndexGranularity.HOUR, IndexGranularity.HOUR,
null null
); );
@ -214,6 +214,7 @@ public class TaskSerdeTest
Assert.assertEquals("rofl", task.getTaskResource().getAvailabilityGroup()); Assert.assertEquals("rofl", task.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(new Period("PT10M"), task.getWindowPeriod()); Assert.assertEquals(new Period("PT10M"), task.getWindowPeriod());
Assert.assertEquals(IndexGranularity.HOUR, task.getSegmentGranularity()); Assert.assertEquals(IndexGranularity.HOUR, task.getSegmentGranularity());
Assert.assertEquals(5, task.getMaxPendingPersists());
Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId());
@ -222,6 +223,7 @@ public class TaskSerdeTest
Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup()); Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(task.getWindowPeriod(), task2.getWindowPeriod()); Assert.assertEquals(task.getWindowPeriod(), task2.getWindowPeriod());
Assert.assertEquals(task.getSegmentGranularity(), task2.getSegmentGranularity()); Assert.assertEquals(task.getSegmentGranularity(), task2.getSegmentGranularity());
Assert.assertEquals(task.getMaxPendingPersists(), task2.getMaxPendingPersists());
} }
@Test @Test

View File

@ -44,7 +44,7 @@ import java.util.concurrent.ExecutorService;
*/ */
public class RealtimePlumberSchool implements PlumberSchool public class RealtimePlumberSchool implements PlumberSchool
{ {
public static final int DEFAULT_MAX_PENDING_PERSISTS = 2; public static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class); private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);