Merge pull request #419 from metamx/mpp

Blocking Executors and maxPendingPersists, oh my!
This commit is contained in:
fjy 2014-03-05 11:57:12 -07:00
commit 1ab11b1f74
5 changed files with 76 additions and 35 deletions

View File

@ -22,11 +22,13 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -60,25 +62,29 @@ public class Execs
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity)
{
return new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(capacity), makeThreadFactory(nameFormat)
, new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
((ArrayBlockingQueue) executor.getQueue()).put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
final BlockingQueue<Runnable> queue;
if (capacity > 0) {
queue = new ArrayBlockingQueue<>(capacity);
} else {
queue = new SynchronousQueue<>();
}
return new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat),
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
executor.getQueue().put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
}
}

View File

@ -20,6 +20,8 @@
package io.druid.concurrent;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.logger.Logger;
import org.junit.Assert;
import org.junit.Test;
@ -30,23 +32,46 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
{
private static final Logger log = new Logger(ExecsTest.class);
@Test
public void testBlockingExecutorService() throws Exception
public void testBlockingExecutorServiceZeroCapacity() throws Exception
{
final int capacity = 3;
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
runTest(0);
}
@Test
public void testBlockingExecutorServiceOneCapacity() throws Exception
{
runTest(1);
}
@Test
public void testBlockingExecutorServiceThreeCapacity() throws Exception
{
runTest(3);
}
private static void runTest(final int capacity) throws Exception
{
final int nTasks = (capacity + 1) * 3;
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("ExecsTest-Blocking-%d", capacity);
final CountDownLatch queueShouldBeFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(nTasks);
final CountDownLatch taskStartSignal = new CountDownLatch(1);
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger();
ExecutorService producer = Executors.newSingleThreadExecutor();
final ExecutorService producer = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"ExecsTest-Producer-%d"
).build()
);
producer.submit(
new Runnable()
{
public void run()
{
for (int i = 0; i < 2 * capacity; i++) {
for (int i = 0; i < nTasks; i++) {
final int taskID = i;
System.out.println("Produced task" + taskID);
blockingExecutor.submit(
@ -55,7 +80,7 @@ public class ExecsTest
@Override
public void run()
{
System.out.println("Starting task" + taskID);
log.info("Starting task: %s", taskID);
try {
taskStartSignal.await();
consumedCount.incrementAndGet();
@ -64,29 +89,31 @@ public class ExecsTest
catch (Exception e) {
throw Throwables.propagate(e);
}
System.out.println("Completed task" + taskID);
log.info("Completed task: %s", taskID);
}
}
);
producedCount.incrementAndGet();
queueFullSignal.countDown();
queueShouldBeFullSignal.countDown();
}
}
}
);
queueFullSignal.await();
// verify that the producer blocks
queueShouldBeFullSignal.await();
// Verify that the producer blocks. I don't think it's possible to be sure that the producer is blocking (since
// it could be doing nothing for any reason). But waiting a short period of time and checking that it hasn't done
// anything should hopefully be sufficient.
Thread.sleep(500);
Assert.assertEquals(capacity + 1, producedCount.get());
// let the tasks run
taskStartSignal.countDown();
// wait until all tasks complete
taskCompletedSignal.await();
// verify all tasks consumed
Assert.assertEquals(2 * capacity, consumedCount.get());
Assert.assertEquals(nTasks, consumedCount.get());
// cleanup
blockingExecutor.shutdown();
producer.shutdown();
}
}

View File

@ -109,7 +109,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
@ -139,7 +139,7 @@ public class RealtimeIndexTask extends AbstractTask
this.firehoseFactory = firehoseFactory;
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
this.maxPendingPersists = (maxPendingPersists == 0)
this.maxPendingPersists = (maxPendingPersists == null)
? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS
: maxPendingPersists;
this.segmentGranularity = segmentGranularity;
@ -398,6 +398,12 @@ public class RealtimeIndexTask extends AbstractTask
return windowPeriod;
}
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
@JsonProperty
public IndexGranularity getSegmentGranularity()
{

View File

@ -198,7 +198,7 @@ public class TaskSerdeTest
null,
null,
new Period("PT10M"),
1,
5,
IndexGranularity.HOUR,
null
);
@ -214,6 +214,7 @@ public class TaskSerdeTest
Assert.assertEquals("rofl", task.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(new Period("PT10M"), task.getWindowPeriod());
Assert.assertEquals(IndexGranularity.HOUR, task.getSegmentGranularity());
Assert.assertEquals(5, task.getMaxPendingPersists());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
@ -222,6 +223,7 @@ public class TaskSerdeTest
Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(task.getWindowPeriod(), task2.getWindowPeriod());
Assert.assertEquals(task.getSegmentGranularity(), task2.getSegmentGranularity());
Assert.assertEquals(task.getMaxPendingPersists(), task2.getMaxPendingPersists());
}
@Test

View File

@ -44,7 +44,7 @@ import java.util.concurrent.ExecutorService;
*/
public class RealtimePlumberSchool implements PlumberSchool
{
public static final int DEFAULT_MAX_PENDING_PERSISTS = 2;
public static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);