From 6fac267f17d745cf6aac9419ae06b306a8898f09 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 15 Sep 2024 01:22:28 -0700 Subject: [PATCH] MSQ: Improved worker cancellation. (#17046) * MSQ: Improved worker cancellation. Four changes: 1) FrameProcessorExecutor now requires that cancellationIds be registered with "registerCancellationId" prior to being used in "runFully" or "runAllFully". 2) FrameProcessorExecutor gains an "asExecutor" method, which allows that executor to be used as an executor for future callbacks in such a way that respects cancellationId. 3) RunWorkOrder gains a "stop" method, which cancels the current cancellationId and closes the current FrameContext. It blocks until both operations are complete. 4) Fixes a bug in RunAllFullyWidget where "processorManager.result()" was called outside "runAllFullyLock", which could cause it to be called out-of-order with "cleanup()" in case of cancellation or other error. Together, these changes help ensure cancellation does not have races. Once "cancel" is called for a given cancellationId, all existing processors and running callbacks are canceled and exit in an orderly manner. Future processors and callbacks with the same cancellationId are rejected before being executed. * Fix test. * Use execute, which doesn't return, to avoid errorprone complaints. * Fix some style stuff. * Further enhancements. * Fix style. --- .../frame/FrameChannelMergerBenchmark.java | 8 +- .../apache/druid/msq/exec/ControllerImpl.java | 54 ++++--- .../apache/druid/msq/exec/RunWorkOrder.java | 137 +++++++++++++++--- .../druid/msq/exec/RunWorkOrderListener.java | 2 +- .../org/apache/druid/msq/exec/WorkerImpl.java | 47 +++--- .../msq/querykit/FrameProcessorTestBase.java | 9 +- .../processor/FrameProcessorExecutor.java | 83 ++++++++--- .../frame/processor/RunAllFullyWidget.java | 6 +- .../processor/RunnableFrameProcessor.java | 65 +++++++++ .../druid/frame/processor/SuperSorter.java | 4 +- .../processor/FrameProcessorExecutorTest.java | 23 +++ .../processor/RunAllFullyWidgetTest.java | 4 +- 12 files changed, 354 insertions(+), 88 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java index a57b7a116c4..25f9015de2b 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java @@ -21,6 +21,7 @@ package org.apache.druid.benchmark.frame; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.guava.FutureUtils; @@ -203,6 +204,7 @@ public class FrameChannelMergerBenchmark private final List sortKey = ImmutableList.of(new KeyColumn(KEY, KeyOrder.ASCENDING)); private List> channelFrames; + private ListeningExecutorService innerExec; private FrameProcessorExecutor exec; private List channels; @@ -226,7 +228,7 @@ public class FrameChannelMergerBenchmark frameReader = FrameReader.create(signature); exec = new FrameProcessorExecutor( - MoreExecutors.listeningDecorator( + innerExec = MoreExecutors.listeningDecorator( Execs.singleThreaded(StringUtils.encodeForFormat(getClass().getSimpleName())) ) ); @@ -335,8 +337,8 @@ public class FrameChannelMergerBenchmark @TearDown(Level.Trial) public void tearDown() throws Exception { - exec.getExecutorService().shutdownNow(); - if (!exec.getExecutorService().awaitTermination(1, TimeUnit.MINUTES)) { + innerExec.shutdownNow(); + if (!innerExec.awaitTermination(1, TimeUnit.MINUTES)) { throw new ISE("Could not terminate executor after 1 minute"); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 4b63d85cda7..6d1ef21abbf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -224,6 +224,7 @@ import java.util.stream.StreamSupport; public class ControllerImpl implements Controller { private static final Logger log = new Logger(ControllerImpl.class); + private static final String RESULT_READER_CANCELLATION_ID = "result-reader"; private final String queryId; private final MSQSpec querySpec; @@ -2189,6 +2190,34 @@ public class ControllerImpl implements Controller } } + /** + * Create a result-reader executor for {@link RunQueryUntilDone#readQueryResults()}. + */ + private static FrameProcessorExecutor createResultReaderExec(final String queryId) + { + return new FrameProcessorExecutor( + MoreExecutors.listeningDecorator( + Execs.singleThreaded(StringUtils.encodeForFormat("msq-result-reader[" + queryId + "]"))) + ); + } + + /** + * Cancel any currently-running work and shut down a result-reader executor, like one created by + * {@link #createResultReaderExec(String)}. + */ + private static void closeResultReaderExec(final FrameProcessorExecutor exec) + { + try { + exec.cancel(RESULT_READER_CANCELLATION_ID); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + exec.shutdownNow(); + } + } + private void stopExternalFetchers() { if (workerSketchFetcher != null) { @@ -2698,12 +2727,9 @@ public class ControllerImpl implements Controller inputChannelFactory = new WorkerInputChannelFactory(netClient, () -> taskIds); } - final FrameProcessorExecutor resultReaderExec = new FrameProcessorExecutor( - MoreExecutors.listeningDecorator( - Execs.singleThreaded(StringUtils.encodeForFormat("msq-result-reader[" + queryId() + "]"))) - ); + final FrameProcessorExecutor resultReaderExec = createResultReaderExec(queryId()); + resultReaderExec.registerCancellationId(RESULT_READER_CANCELLATION_ID); - final String cancellationId = "results-reader"; ReadableConcatFrameChannel resultsChannel = null; try { @@ -2713,7 +2739,7 @@ public class ControllerImpl implements Controller inputChannelFactory, () -> ArenaMemoryAllocator.createOnHeap(5_000_000), resultReaderExec, - cancellationId, + RESULT_READER_CANCELLATION_ID, null, MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context()) ); @@ -2747,7 +2773,7 @@ public class ControllerImpl implements Controller queryListener ); - queryResultsReaderFuture = resultReaderExec.runFully(resultsReader, cancellationId); + queryResultsReaderFuture = resultReaderExec.runFully(resultsReader, RESULT_READER_CANCELLATION_ID); // When results are done being read, kick the main thread. // Important: don't use FutureUtils.futureWithBaggage, because we need queryResultsReaderFuture to resolve @@ -2764,23 +2790,13 @@ public class ControllerImpl implements Controller e, () -> CloseableUtils.closeAll( finalResultsChannel, - () -> resultReaderExec.getExecutorService().shutdownNow() + () -> closeResultReaderExec(resultReaderExec) ) ); } // Result reader is set up. Register with the query-wide closer. - closer.register(() -> { - try { - resultReaderExec.cancel(cancellationId); - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - resultReaderExec.getExecutorService().shutdownNow(); - } - }); + closer.register(() -> closeResultReaderExec(resultReaderExec)); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index 4d028147af0..3d31d7e2c3e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -21,6 +21,7 @@ package org.apache.druid.msq.exec; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -56,6 +57,7 @@ import org.apache.druid.frame.processor.manager.ProcessorManager; import org.apache.druid.frame.processor.manager.ProcessorManagers; import org.apache.druid.frame.util.DurableStorageUtils; import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -67,6 +69,8 @@ import org.apache.druid.msq.counters.CpuCounters; import org.apache.druid.msq.indexing.CountingOutputChannelFactory; import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.InputChannelsImpl; +import org.apache.druid.msq.indexing.error.CanceledFault; +import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSliceReader; @@ -94,7 +98,6 @@ import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.shuffle.output.DurableStorageOutputChannelFactory; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; -import org.apache.druid.utils.CloseableUtils; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import javax.annotation.Nullable; @@ -104,7 +107,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -112,7 +116,29 @@ import java.util.stream.Collectors; */ public class RunWorkOrder { - private final String controllerTaskId; + enum State + { + /** + * Initial state. Must be in this state to call {@link #startAsync()}. + */ + INIT, + + /** + * State entered upon calling {@link #startAsync()}. + */ + STARTED, + + /** + * State entered upon calling {@link #stop()}. + */ + STOPPING, + + /** + * State entered when a call to {@link #stop()} concludes. + */ + STOPPED + } + private final WorkOrder workOrder; private final InputChannelFactory inputChannelFactory; private final CounterTracker counterTracker; @@ -125,7 +151,9 @@ public class RunWorkOrder private final boolean reindex; private final boolean removeNullBytes; private final ByteTracker intermediateSuperSorterLocalStorageTracker; - private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicReference state = new AtomicReference<>(State.INIT); + private final CountDownLatch stopLatch = new CountDownLatch(1); + private final AtomicReference> resultForListener = new AtomicReference<>(); @MonotonicNonNull private InputSliceReader inputSliceReader; @@ -141,7 +169,6 @@ public class RunWorkOrder private ListenableFuture stageOutputChannelsFuture; public RunWorkOrder( - final String controllerTaskId, final WorkOrder workOrder, final InputChannelFactory inputChannelFactory, final CounterTracker counterTracker, @@ -154,7 +181,6 @@ public class RunWorkOrder final boolean removeNullBytes ) { - this.controllerTaskId = controllerTaskId; this.workOrder = workOrder; this.inputChannelFactory = inputChannelFactory; this.counterTracker = counterTracker; @@ -180,15 +206,16 @@ public class RunWorkOrder * Execution proceeds asynchronously after this method returns. The {@link RunWorkOrderListener} passed to the * constructor of this instance can be used to track progress. */ - public void start() throws IOException + public void startAsync() { - if (started.getAndSet(true)) { - throw new ISE("Already started"); + if (!state.compareAndSet(State.INIT, State.STARTED)) { + throw new ISE("Cannot start from state[%s]", state); } final StageDefinition stageDef = workOrder.getStageDefinition(); try { + exec.registerCancellationId(cancellationId); makeInputSliceReader(); makeWorkOutputChannelFactory(); makeShuffleOutputChannelFactory(); @@ -205,16 +232,78 @@ public class RunWorkOrder setUpCompletionCallbacks(); } catch (Throwable t) { - // If start() has problems, cancel anything that was already kicked off, and close the FrameContext. + stopUnchecked(); + } + } + + /** + * Stops an execution that was previously initiated through {@link #startAsync()} and closes the {@link FrameContext}. + * May be called to cancel execution. Must also be called after successful execution in order to ensure that resources + * are all properly cleaned up. + * + * Blocks until execution is fully stopped. + */ + public void stop() throws InterruptedException + { + if (state.compareAndSet(State.INIT, State.STOPPING) + || state.compareAndSet(State.STARTED, State.STOPPING)) { + // Initiate stopping. + Throwable e = null; + try { exec.cancel(cancellationId); } - catch (Throwable t2) { - t.addSuppressed(t2); + catch (Throwable e2) { + e = e2; } - CloseableUtils.closeAndSuppressExceptions(frameContext, t::addSuppressed); - throw t; + try { + frameContext.close(); + } + catch (Throwable e2) { + if (e == null) { + e = e2; + } else { + e.addSuppressed(e2); + } + } + + try { + // notifyListener will ignore this cancellation error if work has already succeeded. + notifyListener(Either.error(new MSQException(CanceledFault.instance()))); + } + catch (Throwable e2) { + if (e == null) { + e = e2; + } else { + e.addSuppressed(e2); + } + } + + stopLatch.countDown(); + + if (e != null) { + Throwables.throwIfInstanceOf(e, InterruptedException.class); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + stopLatch.await(); + } + + /** + * Calls {@link #stop()}. If the call to {@link #stop()} throws {@link InterruptedException}, this method sets + * the interrupt flag and throws an unchecked exception. + */ + public void stopUnchecked() + { + try { + stop(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } @@ -459,19 +548,33 @@ public class RunWorkOrder writeDurableStorageSuccessFile(); } - listener.onSuccess(resultObject); + notifyListener(Either.value(resultObject)); } @Override public void onFailure(final Throwable t) { - listener.onFailure(t); + notifyListener(Either.error(t)); } }, Execs.directExecutor() ); } + /** + * Notify {@link RunWorkOrderListener} that the job is done, if not already notified. + */ + private void notifyListener(final Either result) + { + if (resultForListener.compareAndSet(null, result)) { + if (result.isError()) { + listener.onFailure(result.error()); + } else { + listener.onSuccess(result.valueOrThrow()); + } + } + } + /** * Write {@link DurableStorageUtils#SUCCESS_MARKER_FILENAME} for a particular stage, if durable storage is enabled. */ @@ -561,7 +664,7 @@ public class RunWorkOrder ) { return DurableStorageOutputChannelFactory.createStandardImplementation( - controllerTaskId, + workerContext.queryId(), workOrder.getWorkerNumber(), workOrder.getStageNumber(), workerContext.workerId(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java index 19c3c6570fe..8bffd6f8179 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java @@ -25,7 +25,7 @@ import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; import javax.annotation.Nullable; /** - * Listener for various things that may happen during execution of {@link RunWorkOrder#start()}. Listener methods are + * Listener for various things that may happen during execution of {@link RunWorkOrder#startAsync()}. Listener methods are * fired in processing threads, so they must be thread-safe, and it is important that they run quickly. */ public interface RunWorkOrderListener diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 5d9f9b9db54..74e3850c6e9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -367,28 +367,19 @@ public class WorkerImpl implements Worker final WorkerStageKernel kernel = kernelHolder.kernel; final WorkOrder workOrder = kernel.getWorkOrder(); final StageDefinition stageDefinition = workOrder.getStageDefinition(); - final String cancellationId = cancellationIdFor(stageDefinition.getId()); + final String cancellationId = cancellationIdFor(stageDefinition.getId(), workOrder.getWorkerNumber()); log.info( - "Processing work order for stage[%s]%s", + "Starting work order for stage[%s], workerNumber[%d]%s", stageDefinition.getId(), + workOrder.getWorkerNumber(), (log.isDebugEnabled() ? StringUtils.format(", payload[%s]", context.jsonMapper().writeValueAsString(workOrder)) : "") ); - final FrameContext frameContext = kernelHolder.processorCloser.register(context.frameContext(workOrder)); - kernelHolder.processorCloser.register(() -> { - try { - workerExec.cancel(cancellationId); - } - catch (InterruptedException e) { - // Strange that cancellation would itself be interrupted. Log and suppress. - log.warn(e, "Cancellation interrupted for stage[%s]", stageDefinition.getId()); - Thread.currentThread().interrupt(); - } - }); + final FrameContext frameContext = context.frameContext(workOrder); - // Set up cleanup functions for this work order. + // Set up resultsCloser (called when we are done reading results). kernelHolder.resultsCloser.register(() -> FileUtils.deleteDirectory(frameContext.tempDir())); kernelHolder.resultsCloser.register(() -> removeStageOutputChannels(stageDefinition.getId())); @@ -397,13 +388,9 @@ public class WorkerImpl implements Worker final InputChannelFactory inputChannelFactory = makeBaseInputChannelFactory(workOrder, controllerClient, kernelHolder.processorCloser); - // Start working on this stage immediately. - kernel.startReading(); - final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty(); final boolean includeAllCounters = context.includeAllCounters(); final RunWorkOrder runWorkOrder = new RunWorkOrder( - task.getControllerTaskId(), workOrder, inputChannelFactory, stageCounters.computeIfAbsent( @@ -419,7 +406,12 @@ public class WorkerImpl implements Worker MultiStageQueryContext.removeNullBytes(queryContext) ); - runWorkOrder.start(); + // Set up processorCloser (called when processing is done). + kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked); + + // Start working on this stage immediately. + kernel.startReading(); + runWorkOrder.startAsync(); kernelHolder.partitionBoundariesFuture = runWorkOrder.getStagePartitionBoundariesFuture(); } @@ -987,9 +979,9 @@ public class WorkerImpl implements Worker /** * Returns cancellation ID for a particular stage, to be used in {@link FrameProcessorExecutor#cancel(String)}. */ - private static String cancellationIdFor(final StageId stageId) + private static String cancellationIdFor(final StageId stageId, final int workerNumber) { - return stageId.toString(); + return StringUtils.format("%s_%s", stageId, workerNumber); } /** @@ -1244,10 +1236,19 @@ public class WorkerImpl implements Worker private static class KernelHolder { private final WorkerStageKernel kernel; - private final Closer processorCloser; - private final Closer resultsCloser; private SettableFuture partitionBoundariesFuture; + /** + * Closer for processing. This is closed when all processing for a stage has completed. + */ + private final Closer processorCloser; + + /** + * Closer for results. This is closed when results for a stage are no longer needed. Always closed + * *after* {@link #processorCloser} is done closing. + */ + private final Closer resultsCloser; + public KernelHolder(WorkerStageKernel kernel) { this.kernel = kernel; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java index 439aa148a84..cde2b0ea4e9 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.querykit; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; @@ -47,19 +48,21 @@ public class FrameProcessorTestBase extends InitializedNullHandlingTest { protected static final StagePartition STAGE_PARTITION = new StagePartition(new StageId("q", 0), 0); + private ListeningExecutorService innerExec; protected FrameProcessorExecutor exec; @Before public void setUp() { - exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec"))); + innerExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")); + exec = new FrameProcessorExecutor(innerExec); } @After public void tearDown() throws Exception { - exec.getExecutorService().shutdownNow(); - exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES); + innerExec.shutdownNow(); + innerExec.awaitTermination(10, TimeUnit.MINUTES); } protected ReadableInput makeChannelFromCursorFactory( diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java index c0f79d30e58..f255fbe13a6 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java @@ -46,12 +46,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @@ -61,7 +63,6 @@ import java.util.stream.Collectors; * If you want single threaded execution, use {@code Execs.singleThreaded()}. It is not a good idea to use this with a * same-thread executor like {@code Execs.directExecutor()}, because it will lead to deep call stacks. */ -@SuppressWarnings("CheckReturnValue") public class FrameProcessorExecutor { private static final Logger log = new Logger(FrameProcessorExecutor.class); @@ -70,6 +71,10 @@ public class FrameProcessorExecutor private final Object lock = new Object(); + // Currently-active cancellationIds. + @GuardedBy("lock") + private final Set activeCancellationIds = new HashSet<>(); + // Futures that are active and therefore cancelable. // Does not include return futures: those are in cancelableReturnFutures. @GuardedBy("lock") @@ -96,19 +101,12 @@ public class FrameProcessorExecutor this.exec = exec; } - /** - * Returns the underlying executor service used by this executor. - */ - public ListeningExecutorService getExecutorService() - { - return exec; - } - /** * Runs a processor until it is done, and returns a future that resolves when execution is complete. * - * If "cancellationId" is provided, it can be used with the {@link #cancel(String)} method to cancel all processors - * currently running with the same cancellationId. + * If "cancellationId" is provided, it must have previously been registered with {@link #registerCancellationId}. + * Then, it can be used with the {@link #cancel(String)} method to cancel all processors with that + * same cancellationId. */ public ListenableFuture runFully(final FrameProcessor processor, @Nullable final String cancellationId) { @@ -116,6 +114,11 @@ public class FrameProcessorExecutor final List outputChannels = processor.outputChannels(); final SettableFuture finished = registerCancelableFuture(SettableFuture.create(), true, cancellationId); + if (finished.isDone()) { + // Possibly due to starting life out being canceled. + return finished; + } + class ExecutorRunnable implements Runnable { private final AwaitAnyWidget awaitAnyWidget = new AwaitAnyWidget(inputChannels); @@ -152,7 +155,7 @@ public class FrameProcessorExecutor final IntSet await = result.awaitSet(); if (await.isEmpty()) { - exec.submit(ExecutorRunnable.this); + exec.execute(ExecutorRunnable.this); } else if (result.isAwaitAll() || await.size() == 1) { final List> readabilityFutures = new ArrayList<>(); @@ -164,7 +167,7 @@ public class FrameProcessorExecutor } if (readabilityFutures.isEmpty()) { - exec.submit(ExecutorRunnable.this); + exec.execute(ExecutorRunnable.this); } else { runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures)); } @@ -272,7 +275,7 @@ public class FrameProcessorExecutor public void onSuccess(final V ignored) { try { - exec.submit(ExecutorRunnable.this); + exec.execute(ExecutorRunnable.this); } catch (Throwable e) { fail(e); @@ -390,7 +393,7 @@ public class FrameProcessorExecutor logProcessorStatusString(processor, finished, null); registerCancelableProcessor(processor, cancellationId); - exec.submit(runnable); + exec.execute(runnable); return finished; } @@ -423,8 +426,20 @@ public class FrameProcessorExecutor } /** - * Cancels all processors associated with a given cancellationId. Waits for the processors to exit before - * returning. + * Registers a cancellationId, so it can be provided to {@link #runFully} or {@link #runAllFully}. To avoid the + * set of active cancellationIds growing without bound, callers must also call {@link #cancel(String)} on the + * same cancellationId when done using it. + */ + public void registerCancellationId(final String cancellationId) + { + synchronized (lock) { + activeCancellationIds.add(cancellationId); + } + } + + /** + * Deregisters a cancellationId and cancels any currently-running processors associated with that cancellationId. + * Waits for any canceled processors to exit before returning. */ public void cancel(final String cancellationId) throws InterruptedException { @@ -435,6 +450,7 @@ public class FrameProcessorExecutor final Set> returnFuturesToCancel; synchronized (lock) { + activeCancellationIds.remove(cancellationId); futuresToCancel = cancelableFutures.removeAll(cancellationId); processorsToCancel = cancelableProcessors.removeAll(cancellationId); returnFuturesToCancel = cancelableReturnFutures.removeAll(cancellationId); @@ -457,6 +473,33 @@ public class FrameProcessorExecutor } } + /** + * Returns an {@link Executor} that executes using the same underlying service, and that is also connected to + * cancellation through {@link #cancel(String)}. + * + * @param cancellationId cancellation ID for the executor + */ + public Executor asExecutor(@Nullable final String cancellationId) + { + return command -> runFully(new RunnableFrameProcessor(command), cancellationId); + } + + /** + * Shuts down the underlying executor service immediately. + */ + public void shutdownNow() + { + exec.shutdownNow(); + } + + /** + * Returns the underlying executor service used by this executor. + */ + ListeningExecutorService getExecutorService() + { + return exec; + } + /** * Register a future that will be canceled when the provided {@code cancellationId} is canceled. * @@ -472,6 +515,12 @@ public class FrameProcessorExecutor { if (cancellationId != null) { synchronized (lock) { + if (!activeCancellationIds.contains(cancellationId)) { + // Cancel and return immediately. + future.cancel(true); + return future; + } + final SetMultimap> map = isReturn ? cancelableReturnFutures : cancelableFutures; map.put(cancellationId, future); future.addListener( diff --git a/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java b/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java index a1a1c0f8712..7f79a319c28 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java @@ -306,9 +306,11 @@ public class RunAllFullyWidget } if (isDone) { - finished.compareAndSet(null, Either.value(processorManager.result())); - synchronized (runAllFullyLock) { + if (finished.get() == null) { + finished.compareAndSet(null, Either.value(processorManager.result())); + } + cleanupIfNoMoreProcessors(); } } else { diff --git a/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java b/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java new file mode 100644 index 00000000000..697879490e1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; + +import java.util.Collections; +import java.util.List; + +/** + * Frame processor that simply runs a {@link Runnable}, once. + */ +public class RunnableFrameProcessor implements FrameProcessor +{ + private final Runnable runnable; + + public RunnableFrameProcessor(Runnable runnable) + { + this.runnable = runnable; + } + + @Override + public List inputChannels() + { + return Collections.emptyList(); + } + + @Override + public List outputChannels() + { + return Collections.emptyList(); + } + + @Override + public ReturnOrAwait runIncrementally(IntSet readableInputs) + { + runnable.run(); + return ReturnOrAwait.returnObject(null); + } + + @Override + public void cleanup() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java index e30f2e77b02..b8b74a2b797 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java @@ -297,7 +297,7 @@ public class SuperSorter setAllDoneIfPossible(); } }, - exec.getExecutorService() + exec.asExecutor(cancellationId) ); return FutureUtils.futureWithBaggage( @@ -813,7 +813,7 @@ public class SuperSorter }, // Must run in exec, instead of in the same thread, to avoid running callback immediately if the // worker happens to finish super-quickly. - exec.getExecutorService() + exec.asExecutor(cancellationId) ); } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java index 0f50624078b..4ed2c610525 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java @@ -222,6 +222,7 @@ public class FrameProcessorExecutorTest final SettableFuture future = SettableFuture.create(); final String cancellationId = "xyzzy"; + exec.registerCancellationId(cancellationId); Assert.assertSame(future, exec.registerCancelableFuture(future, false, cancellationId)); exec.cancel(cancellationId); @@ -236,6 +237,8 @@ public class FrameProcessorExecutorTest { final SleepyFrameProcessor processor = new SleepyFrameProcessor(); final String cancellationId = "xyzzy"; + + exec.registerCancellationId(cancellationId); final ListenableFuture future = exec.runFully(processor, cancellationId); processor.awaitRun(); @@ -254,6 +257,8 @@ public class FrameProcessorExecutorTest { final SleepyFrameProcessor processor = new SleepyFrameProcessor(); final String cancellationId = "xyzzy"; + + exec.registerCancellationId(cancellationId); final ListenableFuture future = exec.runFully(processor, cancellationId); processor.awaitRun(); @@ -314,6 +319,8 @@ public class FrameProcessorExecutorTest // Start up all systems at once. for (final String systemId : systemGeneratorsMap.keySet()) { + exec.registerCancellationId(systemId); + for (InfiniteFrameProcessor generator : systemGeneratorsMap.get(systemId)) { processorFutureMap.put(generator, exec.runFully(generator, systemId)); } @@ -391,6 +398,22 @@ public class FrameProcessorExecutorTest // Just making sure no error is thrown when we refer to a nonexistent cancellationId. exec.cancel("nonexistent"); } + + @Test + public void test_runFully_nonexistentCancellationId() + { + final SleepyFrameProcessor processor = new SleepyFrameProcessor(); + final String cancellationId = "xyzzy"; + + // Don't registerCancellationId(cancellationId). + final ListenableFuture future = exec.runFully(processor, cancellationId); + + // Future should be immediately canceled, without running the processor. + Assert.assertTrue(future.isDone()); + Assert.assertTrue(future.isCancelled()); + Assert.assertFalse(processor.didGetInterrupt()); + Assert.assertFalse(processor.didCleanup()); + } } public abstract static class BaseFrameProcessorExecutorTestSuite extends InitializedNullHandlingTest diff --git a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java index 7cd1e980428..d0ae5a986a0 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java @@ -409,6 +409,8 @@ public class RunAllFullyWidgetTest extends FrameProcessorExecutorTest.BaseFrameP .mapToObj(i -> new SleepyFrameProcessor()) .collect(Collectors.toList()); + final String cancellationId = "xyzzy"; + exec.registerCancellationId(cancellationId); final ListenableFuture future = exec.runAllFully( possiblyDelay( ensureClose( @@ -418,7 +420,7 @@ public class RunAllFullyWidgetTest extends FrameProcessorExecutorTest.BaseFrameP ), maxOutstandingProcessors, bouncer, - "xyzzy" + cancellationId ); for (int i = 0; i < expectedRunningProcessors; i++) {