add blocking mechanism after 2 pending persist batches

This commit is contained in:
nishantmonu51 2014-01-17 03:01:04 +05:30
parent 26a0f862fb
commit 6f5e1afbff
3 changed files with 103 additions and 26 deletions

View File

@ -21,10 +21,15 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*/
@ -49,4 +54,31 @@ public class Execs
{
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
}
/**
* @param nameFormat nameformat for threadFactory
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService blockingSingleThreaded(String nameFormat, int capacity)
{
return new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(capacity), makeThreadFactory(nameFormat)
, new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
((ArrayBlockingQueue) executor.getQueue()).put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
}
}

View File

@ -0,0 +1,64 @@
package io.druid.concurrent;
import com.google.common.base.Throwables;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
{
@Test
public void testBlockingExecutorService() throws Exception
{
final int capacity = 3;
final ExecutorService executorService = Execs.blockingSingleThreaded("test%d", capacity);
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger();
final CyclicBarrier barrier = new CyclicBarrier(2);
Thread producer = new Thread("producer")
{
public void run()
{
for (int i = 0; i < 2 * capacity; i++) {
final int taskID = i;
System.out.println("Produced task"+ taskID);
executorService.submit(
new Runnable()
{
@Override
public void run()
{
System.out.println("Starting task" + taskID);
try {
barrier.await();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
consumedCount.incrementAndGet();
System.out.println("Completed task" + taskID);
}
}
);
producedCount.incrementAndGet();
}
}
};
producer.start();
for(int i=0;i<capacity;i++){
Thread.sleep(500);
// total completed tasks +1 running task+ capacity = total produced tasks
Assert.assertEquals(consumedCount.intValue()+1+capacity,producedCount.intValue());
barrier.await();
}
for(int i=0;i<capacity;i++){
barrier.await();
}
}
}

View File

@ -31,7 +31,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
@ -42,6 +41,7 @@ import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Processing;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
@ -84,7 +84,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -92,43 +91,33 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumberSchool implements PlumberSchool
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
private volatile boolean shuttingDown = false;
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentPusher dataSegmentPusher = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
private volatile SegmentPublisher segmentPublisher = null;
@JacksonInject
@NotNull
private volatile ServerView serverView = null;
@JacksonInject
@NotNull
@Processing
private volatile ExecutorService queryExecutorService = null;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@ -485,22 +474,13 @@ public class RealtimePlumberSchool implements PlumberSchool
private void initializeExecutors()
{
if (persistExecutor == null) {
persistExecutor = Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_persist_%d")
.build()
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.blockingSingleThreaded(
"plumber_persist_%d",2
);
}
if (scheduledExecutor == null) {
scheduledExecutor = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_scheduled_%d")
.build()
);
scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
}
}
@ -697,7 +677,8 @@ public class RealtimePlumberSchool implements PlumberSchool
/**
* Unannounces a given sink and removes all local references to it.
*/
private void abandonSegment(final long truncatedTime, final Sink sink) {
private void abandonSegment(final long truncatedTime, final Sink sink)
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));