[ML] Prevent submit after autodetect worker is stopped (#37700)

Runnables can be submitted to
AutodetectProcessManager.AutodetectWorkerExecutorService
without error after it has been shutdown which can lead
to requests timing out as their handlers are never called
by the terminated executor.

This change throws an EsRejectedExecutionException if a
runnable is submitted after after the shutdown and calls
AbstractRunnable.onRejection on any tasks not run.

Closes #37108
This commit is contained in:
David Kyle 2019-01-29 15:09:40 +00:00 committed by David Roberts
parent 3c9f7031b9
commit 6d1693ff49
2 changed files with 96 additions and 1 deletions

View File

@ -77,6 +77,7 @@ import java.io.InputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -831,7 +832,16 @@ public class AutodetectProcessManager implements ClusterStateListener {
}
@Override
public void execute(Runnable command) {
public synchronized void execute(Runnable command) {
if (isShutdown()) {
EsRejectedExecutionException rejected = new EsRejectedExecutionException("autodetect worker service has shutdown", true);
if (command instanceof AbstractRunnable) {
((AbstractRunnable) command).onRejection(rejected);
} else {
throw rejected;
}
}
boolean added = queue.offer(contextHolder.preserveContext(command));
if (added == false) {
throw new ElasticsearchStatusException("Unable to submit operation", RestStatus.TOO_MANY_REQUESTS);
@ -851,6 +861,21 @@ public class AutodetectProcessManager implements ClusterStateListener {
EsExecutors.rethrowErrors(contextHolder.unwrap(runnable));
}
}
synchronized (this) {
// if shutdown with tasks pending notify the handlers
if (queue.isEmpty() == false) {
List<Runnable> notExecuted = new ArrayList<>();
queue.drainTo(notExecuted);
for (Runnable runnable : notExecuted) {
if (runnable instanceof AbstractRunnable) {
((AbstractRunnable) runnable).onRejection(
new EsRejectedExecutionException("unable to process as autodetect worker service has shutdown", true));
}
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {

View File

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -54,6 +55,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@ -72,9 +74,11 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@ -125,6 +129,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private Quantiles quantiles = new Quantiles("foo", new Date(), "state");
private Set<MlFilter> filters = new HashSet<>();
private ThreadPool threadPool;
@Before
public void setup() throws Exception {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
@ -159,8 +165,16 @@ public class AutodetectProcessManagerTests extends ESTestCase {
handler.accept(buildAutodetectParams());
return null;
}).when(jobResultsProvider).getAutodetectParams(any(), any(), any());
threadPool = new TestThreadPool("AutodetectProcessManagerTests");
}
@After
public void stopThreadPool() throws InterruptedException {
terminate(threadPool);
}
public void testMaxOpenJobsSetting_givenDefault() {
int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY);
assertEquals(20, maxOpenJobs);
@ -690,6 +704,62 @@ public class AutodetectProcessManagerTests extends ESTestCase {
}
}
public void testAutodetectWorkerExecutorService_SubmitAfterShutdown() {
AutodetectProcessManager.AutodetectWorkerExecutorService executor =
new AutodetectWorkerExecutorService(new ThreadContext(Settings.EMPTY));
threadPool.generic().execute(() -> executor.start());
executor.shutdown();
expectThrows(EsRejectedExecutionException.class, () -> executor.execute(() -> {}));
}
public void testAutodetectWorkerExecutorService_TasksNotExecutedCallHandlerOnShutdown()
throws InterruptedException, ExecutionException {
AutodetectProcessManager.AutodetectWorkerExecutorService executor =
new AutodetectWorkerExecutorService(new ThreadContext(Settings.EMPTY));
CountDownLatch latch = new CountDownLatch(1);
Future<?> executorFinished = threadPool.generic().submit(() -> executor.start());
// run a task that will block while the others are queued up
executor.execute(() -> {
try {
latch.await();
} catch (InterruptedException e) {
}
});
AtomicBoolean runnableShouldNotBeCalled = new AtomicBoolean(false);
executor.execute(() -> runnableShouldNotBeCalled.set(true));
AtomicInteger onFailureCallCount = new AtomicInteger();
AtomicInteger doRunCallCount = new AtomicInteger();
for (int i=0; i<2; i++) {
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
onFailureCallCount.incrementAndGet();
}
@Override
protected void doRun() {
doRunCallCount.incrementAndGet();
}
});
}
// now shutdown
executor.shutdown();
latch.countDown();
executorFinished.get();
assertFalse(runnableShouldNotBeCalled.get());
// the AbstractRunnables should have had their callbacks called
assertEquals(2, onFailureCallCount.get());
assertEquals(0, doRunCallCount.get());
}
private AutodetectProcessManager createNonSpyManager(String jobId) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);