TaskMaster deadlock fix (#4548)

* Stop RemoteTaskRunner's cleanupExec using TaskMaster's lifecycle, not global injected lifecycle

* Prohibit starting Lifecycle twice; Make Lifecycle to reject addMaybeStartHandler() attempts in the process of stopping rather than entering deadlock

* Fix Lifecycle.addMaybeStartHandler()

* Remove RemoteTaskRunnerFactoryTest

* Add docs

* Language

* Address comments

* Fix RemoteTaskRunnerTestUtils
This commit is contained in:
Roman Leventov 2017-08-08 00:28:43 +03:00 committed by Fangjin Yang
parent aa7e4ae5e4
commit 486b7a2347
6 changed files with 175 additions and 252 deletions

View File

@ -107,7 +107,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -189,7 +188,6 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
)
{
@ -206,7 +204,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
.build();
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
this.cleanupExec = MoreExecutors.listeningDecorator(
ScheduledExecutors.fixed(1, "RemoteTaskRunner-Scheduled-Cleanup--%d")
);
this.provisioningStrategy = provisioningStrategy;
this.runPendingTasksExec = Execs.multiThreaded(
config.getPendingTasksRunnerNumThreads(),
@ -371,6 +371,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
if (runPendingTasksExec != null) {
runPendingTasksExec.shutdown();
}
if (cleanupExec != null) {
cleanupExec.shutdown();
}
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -30,7 +30,6 @@ import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
@ -47,7 +46,6 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final ProvisioningStrategy provisioningStrategy;
private final ScheduledExecutorFactory factory;
@Inject
public RemoteTaskRunnerFactory(
@ -57,7 +55,6 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
final ScheduledExecutorFactory factory,
final ProvisioningSchedulerConfig provisioningSchedulerConfig,
final ProvisioningStrategy provisioningStrategy
)
@ -70,7 +67,6 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
this.workerConfigRef = workerConfigRef;
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.provisioningStrategy = provisioningStrategy;
this.factory = factory;
}
@Override
@ -84,10 +80,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
new PathChildrenCacheFactory.Builder().withCompressed(true),
httpClient,
workerConfigRef,
factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"),
provisioningSchedulerConfig.isDoAutoscale()
? provisioningStrategy
: new NoopProvisioningStrategy<>()
provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy : new NoopProvisioningStrategy<>()
);
}
}

View File

@ -1,143 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.metamx.http.client.HttpClient;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
import io.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
public class RemoteTaskRunnerFactoryTest
{
private static final Joiner joiner = Joiner.on("/");
private static final String basePath = "/test/druid";
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
@Before
public void setUp() throws Exception
{
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.blockUntilConnected();
}
@After
public void tearDown() throws Exception
{
cf.close();
testingCluster.stop();
}
@Test
public void testExecNotSharedBetweenRunners()
{
final AtomicInteger executorCount = new AtomicInteger(0);
RemoteTaskRunnerConfig config = new RemoteTaskRunnerConfig();
IndexerZkConfig indexerZkConfig = new IndexerZkConfig(
new ZkPathsConfig()
{
@Override
public String getBase()
{
return basePath;
}
}, null, null, null, null, null
);
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
Supplier<WorkerBehaviorConfig> workerBehaviorConfig = EasyMock.createMock(Supplier.class);
ScheduledExecutorFactory executorFactory = new ScheduledExecutorFactory()
{
@Override
public ScheduledExecutorService create(int i, String s)
{
executorCount.incrementAndGet();
return ScheduledExecutors.fixed(i, s);
}
};
SimpleWorkerProvisioningConfig provisioningConfig = new SimpleWorkerProvisioningConfig();
ProvisioningSchedulerConfig provisioningSchedulerConfig = new ProvisioningSchedulerConfig()
{
@Override
public boolean isDoAutoscale()
{
return true;
}
};
RemoteTaskRunnerFactory factory = new RemoteTaskRunnerFactory(
cf,
config,
indexerZkConfig,
jsonMapper,
httpClient,
workerBehaviorConfig,
executorFactory,
provisioningSchedulerConfig,
new SimpleWorkerProvisioningStrategy(
provisioningConfig,
workerBehaviorConfig,
provisioningSchedulerConfig
)
);
Assert.assertEquals(0, executorCount.get());
RemoteTaskRunner remoteTaskRunner1 = factory.build();
Assert.assertEquals(1, executorCount.get());
RemoteTaskRunner remoteTaskRunner2 = factory.build();
Assert.assertEquals(2, executorCount.get());
}
}

View File

@ -39,7 +39,6 @@ import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
@ -48,7 +47,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -133,7 +131,6 @@ public class RemoteTaskRunnerTestUtils
new PathChildrenCacheFactory.Builder(),
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
provisioningStrategy
);
@ -231,7 +228,6 @@ public class RemoteTaskRunnerTestUtils
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
)
{
@ -243,7 +239,6 @@ public class RemoteTaskRunnerTestUtils
pathChildrenCacheFactory,
httpClient,
workerConfigRef,
cleanupExec,
provisioningStrategy
);
}

View File

@ -19,20 +19,21 @@
package io.druid.java.util.common.lifecycle;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A manager of object Lifecycles.
@ -53,22 +54,34 @@ public class Lifecycle
{
private static final Logger log = new Logger(Lifecycle.class);
private final Map<Stage, CopyOnWriteArrayList<Handler>> handlers;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false);
private volatile Stage currStage = null;
public static enum Stage
public enum Stage
{
NORMAL,
LAST
}
private enum State
{
/** Lifecycle's state before {@link #start()} is called. */
NOT_STARTED,
/** Lifecycle's state since {@link #start()} and before {@link #stop()} is called. */
RUNNING,
/** Lifecycle's state since {@link #stop()} is called. */
STOP
}
private final NavigableMap<Stage, CopyOnWriteArrayList<Handler>> handlers;
/** This lock is used to linearize all calls to Handler.start() and Handler.stop() on the managed handlers. */
private final Lock startStopLock = new ReentrantLock();
private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
private Stage currStage = null;
private final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false);
public Lifecycle()
{
handlers = Maps.newHashMap();
handlers = new TreeMap<>();
for (Stage stage : Stage.values()) {
handlers.put(stage, new CopyOnWriteArrayList<Handler>());
handlers.put(stage, new CopyOnWriteArrayList<>());
}
}
@ -153,12 +166,18 @@ public class Lifecycle
*/
public void addHandler(Handler handler, Stage stage)
{
synchronized (handlers) {
if (started.get()) {
if (!startStopLock.tryLock()) {
throw new ISE("Cannot add a handler in the process of Lifecycle starting or stopping");
}
try {
if (!state.get().equals(State.NOT_STARTED)) {
throw new ISE("Cannot add a handler after the Lifecycle has started, it doesn't work that way.");
}
handlers.get(stage).add(handler);
}
finally {
startStopLock.unlock();
}
}
/**
@ -241,59 +260,85 @@ public class Lifecycle
*/
public void addMaybeStartHandler(Handler handler, Stage stage) throws Exception
{
synchronized (handlers) {
if (started.get()) {
if (currStage == null || stage.compareTo(currStage) < 1) {
if (!startStopLock.tryLock()) {
// (*) This check is why the state should be changed before startStopLock.lock() in stop(). This check allows to
// spot wrong use of Lifecycle instead of entering deadlock, like https://github.com/druid-io/druid/issues/3579.
if (state.get().equals(State.STOP)) {
throw new ISE("Cannot add a handler in the process of Lifecycle stopping");
}
startStopLock.lock();
}
try {
if (state.get().equals(State.STOP)) {
throw new ISE("Cannot add a handler after the Lifecycle has stopped");
}
if (state.get().equals(State.RUNNING)) {
if (stage.compareTo(currStage) <= 0) {
handler.start();
}
}
handlers.get(stage).add(handler);
}
finally {
startStopLock.unlock();
}
}
public void start() throws Exception
{
synchronized (handlers) {
if (!started.compareAndSet(false, true)) {
startStopLock.lock();
try {
if (!state.get().equals(State.NOT_STARTED)) {
throw new ISE("Already started");
}
for (Stage stage : stagesOrdered()) {
currStage = stage;
for (Handler handler : handlers.get(stage)) {
if (!state.compareAndSet(State.NOT_STARTED, State.RUNNING)) {
throw new ISE("stop() is called concurrently with start()");
}
for (Map.Entry<Stage, ? extends List<Handler>> e : handlers.entrySet()) {
currStage = e.getKey();
for (Handler handler : e.getValue()) {
handler.start();
}
}
}
finally {
startStopLock.unlock();
}
}
public void stop()
{
synchronized (handlers) {
if (!started.compareAndSet(true, false)) {
log.info("Already stopped and stop was called. Silently skipping");
return;
}
List<Exception> exceptions = Lists.newArrayList();
for (Stage stage : Lists.reverse(stagesOrdered())) {
final CopyOnWriteArrayList<Handler> stageHandlers = handlers.get(stage);
final ListIterator<Handler> iter = stageHandlers.listIterator(stageHandlers.size());
while (iter.hasPrevious()) {
final Handler handler = iter.previous();
// This CAS outside of a block guarded by startStopLock is the only reason why state is AtomicReference rather than
// a simple variable. State change before startStopLock.lock() is needed for the new state visibility during the
// check in addMaybeStartHandler() marked by (*).
if (!state.compareAndSet(State.RUNNING, State.STOP)) {
log.info("Already stopped and stop was called. Silently skipping");
return;
}
startStopLock.lock();
try {
RuntimeException thrown = null;
for (List<Handler> stageHandlers : handlers.descendingMap().values()) {
for (Handler handler : Lists.reverse(stageHandlers)) {
try {
handler.stop();
}
catch (Exception e) {
catch (RuntimeException e) {
log.warn(e, "exception thrown when stopping %s", handler);
exceptions.add(e);
if (thrown == null) {
thrown = e;
}
}
}
}
if (!exceptions.isEmpty()) {
throw Throwables.propagate(exceptions.get(0));
if (thrown != null) {
throw thrown;
}
}
finally {
startStopLock.unlock();
}
}
public void ensureShutdownHook()
@ -321,12 +366,6 @@ public class Lifecycle
Thread.currentThread().join();
}
private static List<Stage> stagesOrdered()
{
return Arrays.asList(Stage.NORMAL, Stage.LAST);
}
public static interface Handler
{
public void start() throws Exception;
@ -393,11 +432,6 @@ public class Lifecycle
}
}
public boolean isStarted()
{
return started.get();
}
private static class StartCloseHandler implements Handler
{
private static final Logger log = new Logger(StartCloseHandler.class);

View File

@ -19,7 +19,6 @@
package io.druid.java.util.common.lifecycle;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -33,15 +32,31 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class LifecycleTest
{
private static final Lifecycle.Handler dummyHandler = new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
// do nothing
}
@Override
public void stop()
{
// do nothing
}
};
@Test
public void testConcurrentStartStopOnce() throws Exception
{
@ -49,8 +64,7 @@ public class LifecycleTest
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads));
final Lifecycle lifecycle = new Lifecycle();
final AtomicLong startedCount = new AtomicLong(0L);
final AtomicLong failedCount = new AtomicLong(0L);
final AtomicLong handlerFailedCount = new AtomicLong(0L);
final Lifecycle.Handler exceptionalHandler = new Lifecycle.Handler()
{
final AtomicBoolean started = new AtomicBoolean(false);
@ -59,65 +73,48 @@ public class LifecycleTest
public void start() throws Exception
{
if (!started.compareAndSet(false, true)) {
failedCount.incrementAndGet();
handlerFailedCount.incrementAndGet();
throw new ISE("Already started");
}
startedCount.incrementAndGet();
}
@Override
public void stop()
{
if (!started.compareAndSet(true, false)) {
failedCount.incrementAndGet();
handlerFailedCount.incrementAndGet();
throw new ISE("Not yet started started");
}
}
};
lifecycle.addHandler(exceptionalHandler);
Collection<ListenableFuture<?>> futures = new ArrayList<>(numThreads);
final CyclicBarrier barrier = new CyclicBarrier(numThreads);
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean threadsStartLatch = new AtomicBoolean(false);
final AtomicInteger threadFailedCount = new AtomicInteger(0);
for (int i = 0; i < numThreads; ++i) {
futures.add(
executorService.submit(
new Runnable()
{
@Override
public void run()
{
try {
for (int i = 0; i < 1024; ++i) {
if (started.compareAndSet(false, true)) {
lifecycle.start();
}
barrier.await();
lifecycle.stop();
barrier.await();
started.set(false);
barrier.await();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
executorService.submit(() -> {
try {
while (!threadsStartLatch.get()) {
// await
}
)
lifecycle.start();
}
catch (Exception e) {
threadFailedCount.incrementAndGet();
}
})
);
}
try {
threadsStartLatch.set(true);
Futures.allAsList(futures).get();
}
finally {
lifecycle.stop();
}
Assert.assertEquals(0, failedCount.get());
Assert.assertTrue(startedCount.get() > 0);
Assert.assertEquals(numThreads - 1, threadFailedCount.get());
Assert.assertEquals(0, handlerFailedCount.get());
executorService.shutdownNow();
}
@ -125,7 +122,6 @@ public class LifecycleTest
public void testStartStopOnce() throws Exception
{
final Lifecycle lifecycle = new Lifecycle();
final AtomicLong startedCount = new AtomicLong(0L);
final AtomicLong failedCount = new AtomicLong(0L);
Lifecycle.Handler exceptionalHandler = new Lifecycle.Handler()
{
@ -138,7 +134,6 @@ public class LifecycleTest
failedCount.incrementAndGet();
throw new ISE("Already started");
}
startedCount.incrementAndGet();
}
@Override
@ -155,9 +150,6 @@ public class LifecycleTest
lifecycle.stop();
lifecycle.stop();
lifecycle.stop();
lifecycle.start();
lifecycle.stop();
Assert.assertEquals(2, startedCount.get());
Assert.assertEquals(0, failedCount.get());
Exception ex = null;
try {
@ -260,7 +252,7 @@ public class LifecycleTest
private final List<Integer> orderOfStarts;
private final List<Integer> orderOfStops;
public ObjectToBeLifecycled(
ObjectToBeLifecycled(
int id,
List<Integer> orderOfStarts,
List<Integer> orderOfStops
@ -283,4 +275,52 @@ public class LifecycleTest
orderOfStops.add(id);
}
}
@Test
public void testFailAddToLifecycleDuringStopMethod() throws Exception
{
CountDownLatch reachedStop = new CountDownLatch(1);
CountDownLatch stopper = new CountDownLatch(1);
Lifecycle.Handler stoppingHandler = new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
// do nothing
}
@Override
public void stop()
{
reachedStop.countDown();
try {
stopper.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
Lifecycle lifecycle = new Lifecycle();
lifecycle.addHandler(stoppingHandler);
lifecycle.start();
new Thread(lifecycle::stop).start(); // will stop at stoppingHandler.stop()
reachedStop.await();
try {
lifecycle.addHandler(dummyHandler);
Assert.fail("Expected exception");
}
catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("Cannot add a handler"));
}
try {
lifecycle.addMaybeStartHandler(dummyHandler);
Assert.fail("Expected exception");
}
catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("Cannot add a handler"));
}
}
}