Merge pull request #361 from metamx/worker-oome-fix

Worker oome fix
This commit is contained in:
fjy 2014-01-20 11:34:45 -08:00
commit c1f0027a36
7 changed files with 152 additions and 31 deletions

View File

@ -21,10 +21,15 @@ package io.druid.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*/
@ -49,4 +54,31 @@ public class Execs
{
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
}
/**
* @param nameFormat nameformat for threadFactory
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService newBlockingSingleThreaded(String nameFormat, int capacity)
{
return new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(capacity), makeThreadFactory(nameFormat)
, new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
((ArrayBlockingQueue) executor.getQueue()).put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
}
}

View File

@ -0,0 +1,92 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.concurrent;
import com.google.common.base.Throwables;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecsTest
{
@Test
public void testBlockingExecutorService() throws Exception
{
final int capacity = 3;
final ExecutorService blockingExecutor = Execs.newBlockingSingleThreaded("test%d", capacity);
final CountDownLatch queueFullSignal = new CountDownLatch(capacity + 1);
final CountDownLatch taskCompletedSignal = new CountDownLatch(2 * capacity);
final CountDownLatch taskStartSignal = new CountDownLatch(1);
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger consumedCount = new AtomicInteger();
ExecutorService producer = Executors.newSingleThreadExecutor();
producer.submit(
new Runnable()
{
public void run()
{
for (int i = 0; i < 2 * capacity; i++) {
final int taskID = i;
System.out.println("Produced task" + taskID);
blockingExecutor.submit(
new Runnable()
{
@Override
public void run()
{
System.out.println("Starting task" + taskID);
try {
taskStartSignal.await();
consumedCount.incrementAndGet();
taskCompletedSignal.countDown();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
System.out.println("Completed task" + taskID);
}
}
);
producedCount.incrementAndGet();
queueFullSignal.countDown();
}
}
}
);
queueFullSignal.await();
// verify that the producer blocks
Assert.assertEquals(capacity + 1, producedCount.get());
// let the tasks run
taskStartSignal.countDown();
// wait until all tasks complete
taskCompletedSignal.await();
// verify all tasks consumed
Assert.assertEquals(2 * capacity, consumedCount.get());
// cleanup
blockingExecutor.shutdown();
producer.shutdown();
}
}

View File

@ -194,7 +194,7 @@ public class RealtimeIndexTask extends AbstractTask
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
windowPeriod,
new File(toolbox.getTaskWorkDir(), "persist"),
segmentGranularity
segmentGranularity, fireDepartmentConfig.getMaxPendingPersists()
);
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);

View File

@ -28,17 +28,23 @@ import org.joda.time.Period;
*/
public class FireDepartmentConfig
{
private static int MAX_PENDING_PERSIST_BATCHES_DEFAULT = 2;
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
private final int maxPendingPersists;
@JsonCreator
public FireDepartmentConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
this.maxRowsInMemory = maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod;
this.maxPendingPersists = maxPendingPersists > 0
? maxPendingPersists
: MAX_PENDING_PERSIST_BATCHES_DEFAULT;
Preconditions.checkArgument(maxRowsInMemory > 0, "maxRowsInMemory[%s] should be greater than 0", maxRowsInMemory);
Preconditions.checkNotNull(intermediatePersistPeriod, "intermediatePersistPeriod");
@ -55,4 +61,10 @@ public class FireDepartmentConfig
{
return intermediatePersistPeriod;
}
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
}

View File

@ -31,7 +31,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
@ -42,6 +41,7 @@ import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Processing;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
@ -84,7 +84,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -92,43 +91,34 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumberSchool implements PlumberSchool
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
private final int maxPendingPersists;
private volatile boolean shuttingDown = false;
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentPusher dataSegmentPusher = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
private volatile SegmentPublisher segmentPublisher = null;
@JacksonInject
@NotNull
private volatile ServerView serverView = null;
@JacksonInject
@NotNull
@Processing
private volatile ExecutorService queryExecutorService = null;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@ -136,7 +126,8 @@ public class RealtimePlumberSchool implements PlumberSchool
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
this.windowPeriod = windowPeriod;
@ -144,7 +135,9 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersists = maxPendingPersists;
Preconditions.checkArgument(maxPendingPersists > 0);
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
@ -485,22 +478,13 @@ public class RealtimePlumberSchool implements PlumberSchool
private void initializeExecutors()
{
if (persistExecutor == null) {
persistExecutor = Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_persist_%d")
.build()
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.newBlockingSingleThreaded(
"plumber_persist_%d", maxPendingPersists
);
}
if (scheduledExecutor == null) {
scheduledExecutor = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("plumber_scheduled_%d")
.build()
);
scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
}
}
@ -697,7 +681,8 @@ public class RealtimePlumberSchool implements PlumberSchool
/**
* Unannounces a given sink and removes all local references to it.
*/
private void abandonSegment(final long truncatedTime, final Sink sink) {
private void abandonSegment(final long truncatedTime, final Sink sink)
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));

View File

@ -78,7 +78,7 @@ public class RealtimeManagerTest
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
new FireDepartmentConfig(1, new Period("P1Y")),
new FireDepartmentConfig(1, new Period("P1Y"), 1),
new FirehoseFactory()
{
@Override

View File

@ -86,7 +86,7 @@ public class RealtimePlumberSchoolTest
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
IndexGranularity.HOUR
IndexGranularity.HOUR, 1
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);