MSQ: Improved worker cancellation. (#17046)

* MSQ: Improved worker cancellation.

Four changes:

1) FrameProcessorExecutor now requires that cancellationIds be registered
   with "registerCancellationId" prior to being used in "runFully" or "runAllFully".

2) FrameProcessorExecutor gains an "asExecutor" method, which allows that
   executor to be used as an executor for future callbacks in such a way
   that respects cancellationId.

3) RunWorkOrder gains a "stop" method, which cancels the current
   cancellationId and closes the current FrameContext. It blocks until
   both operations are complete.

4) Fixes a bug in RunAllFullyWidget where "processorManager.result()" was
   called outside "runAllFullyLock", which could cause it to be called
   out-of-order with "cleanup()" in case of cancellation or other error.

Together, these changes help ensure cancellation does not have races.
Once "cancel" is called for a given cancellationId, all existing processors
and running callbacks are canceled and exit in an orderly manner. Future
processors and callbacks with the same cancellationId are rejected
before being executed.

* Fix test.

* Use execute, which doesn't return, to avoid errorprone complaints.

* Fix some style stuff.

* Further enhancements.

* Fix style.
This commit is contained in:
Gian Merlino 2024-09-15 01:22:28 -07:00 committed by GitHub
parent a276871dd0
commit 6fac267f17
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 354 additions and 88 deletions

View File

@ -21,6 +21,7 @@ package org.apache.druid.benchmark.frame;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.FutureUtils;
@ -203,6 +204,7 @@ public class FrameChannelMergerBenchmark
private final List<KeyColumn> sortKey = ImmutableList.of(new KeyColumn(KEY, KeyOrder.ASCENDING));
private List<List<Frame>> channelFrames;
private ListeningExecutorService innerExec;
private FrameProcessorExecutor exec;
private List<BlockingQueueFrameChannel> channels;
@ -226,7 +228,7 @@ public class FrameChannelMergerBenchmark
frameReader = FrameReader.create(signature);
exec = new FrameProcessorExecutor(
MoreExecutors.listeningDecorator(
innerExec = MoreExecutors.listeningDecorator(
Execs.singleThreaded(StringUtils.encodeForFormat(getClass().getSimpleName()))
)
);
@ -335,8 +337,8 @@ public class FrameChannelMergerBenchmark
@TearDown(Level.Trial)
public void tearDown() throws Exception
{
exec.getExecutorService().shutdownNow();
if (!exec.getExecutorService().awaitTermination(1, TimeUnit.MINUTES)) {
innerExec.shutdownNow();
if (!innerExec.awaitTermination(1, TimeUnit.MINUTES)) {
throw new ISE("Could not terminate executor after 1 minute");
}
}

View File

@ -224,6 +224,7 @@ import java.util.stream.StreamSupport;
public class ControllerImpl implements Controller
{
private static final Logger log = new Logger(ControllerImpl.class);
private static final String RESULT_READER_CANCELLATION_ID = "result-reader";
private final String queryId;
private final MSQSpec querySpec;
@ -2189,6 +2190,34 @@ public class ControllerImpl implements Controller
}
}
/**
* Create a result-reader executor for {@link RunQueryUntilDone#readQueryResults()}.
*/
private static FrameProcessorExecutor createResultReaderExec(final String queryId)
{
return new FrameProcessorExecutor(
MoreExecutors.listeningDecorator(
Execs.singleThreaded(StringUtils.encodeForFormat("msq-result-reader[" + queryId + "]")))
);
}
/**
* Cancel any currently-running work and shut down a result-reader executor, like one created by
* {@link #createResultReaderExec(String)}.
*/
private static void closeResultReaderExec(final FrameProcessorExecutor exec)
{
try {
exec.cancel(RESULT_READER_CANCELLATION_ID);
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
exec.shutdownNow();
}
}
private void stopExternalFetchers()
{
if (workerSketchFetcher != null) {
@ -2698,12 +2727,9 @@ public class ControllerImpl implements Controller
inputChannelFactory = new WorkerInputChannelFactory(netClient, () -> taskIds);
}
final FrameProcessorExecutor resultReaderExec = new FrameProcessorExecutor(
MoreExecutors.listeningDecorator(
Execs.singleThreaded(StringUtils.encodeForFormat("msq-result-reader[" + queryId() + "]")))
);
final FrameProcessorExecutor resultReaderExec = createResultReaderExec(queryId());
resultReaderExec.registerCancellationId(RESULT_READER_CANCELLATION_ID);
final String cancellationId = "results-reader";
ReadableConcatFrameChannel resultsChannel = null;
try {
@ -2713,7 +2739,7 @@ public class ControllerImpl implements Controller
inputChannelFactory,
() -> ArenaMemoryAllocator.createOnHeap(5_000_000),
resultReaderExec,
cancellationId,
RESULT_READER_CANCELLATION_ID,
null,
MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context())
);
@ -2747,7 +2773,7 @@ public class ControllerImpl implements Controller
queryListener
);
queryResultsReaderFuture = resultReaderExec.runFully(resultsReader, cancellationId);
queryResultsReaderFuture = resultReaderExec.runFully(resultsReader, RESULT_READER_CANCELLATION_ID);
// When results are done being read, kick the main thread.
// Important: don't use FutureUtils.futureWithBaggage, because we need queryResultsReaderFuture to resolve
@ -2764,23 +2790,13 @@ public class ControllerImpl implements Controller
e,
() -> CloseableUtils.closeAll(
finalResultsChannel,
() -> resultReaderExec.getExecutorService().shutdownNow()
() -> closeResultReaderExec(resultReaderExec)
)
);
}
// Result reader is set up. Register with the query-wide closer.
closer.register(() -> {
try {
resultReaderExec.cancel(cancellationId);
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
resultReaderExec.getExecutorService().shutdownNow();
}
});
closer.register(() -> closeResultReaderExec(resultReaderExec));
}
/**

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -56,6 +57,7 @@ import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.frame.processor.manager.ProcessorManagers;
import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -67,6 +69,8 @@ import org.apache.druid.msq.counters.CpuCounters;
import org.apache.druid.msq.indexing.CountingOutputChannelFactory;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.InputChannelsImpl;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
@ -94,7 +98,6 @@ import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.shuffle.output.DurableStorageOutputChannelFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.utils.CloseableUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nullable;
@ -104,7 +107,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@ -112,7 +116,29 @@ import java.util.stream.Collectors;
*/
public class RunWorkOrder
{
private final String controllerTaskId;
enum State
{
/**
* Initial state. Must be in this state to call {@link #startAsync()}.
*/
INIT,
/**
* State entered upon calling {@link #startAsync()}.
*/
STARTED,
/**
* State entered upon calling {@link #stop()}.
*/
STOPPING,
/**
* State entered when a call to {@link #stop()} concludes.
*/
STOPPED
}
private final WorkOrder workOrder;
private final InputChannelFactory inputChannelFactory;
private final CounterTracker counterTracker;
@ -125,7 +151,9 @@ public class RunWorkOrder
private final boolean reindex;
private final boolean removeNullBytes;
private final ByteTracker intermediateSuperSorterLocalStorageTracker;
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
private final CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicReference<Either<Throwable, Object>> resultForListener = new AtomicReference<>();
@MonotonicNonNull
private InputSliceReader inputSliceReader;
@ -141,7 +169,6 @@ public class RunWorkOrder
private ListenableFuture<OutputChannels> stageOutputChannelsFuture;
public RunWorkOrder(
final String controllerTaskId,
final WorkOrder workOrder,
final InputChannelFactory inputChannelFactory,
final CounterTracker counterTracker,
@ -154,7 +181,6 @@ public class RunWorkOrder
final boolean removeNullBytes
)
{
this.controllerTaskId = controllerTaskId;
this.workOrder = workOrder;
this.inputChannelFactory = inputChannelFactory;
this.counterTracker = counterTracker;
@ -180,15 +206,16 @@ public class RunWorkOrder
* Execution proceeds asynchronously after this method returns. The {@link RunWorkOrderListener} passed to the
* constructor of this instance can be used to track progress.
*/
public void start() throws IOException
public void startAsync()
{
if (started.getAndSet(true)) {
throw new ISE("Already started");
if (!state.compareAndSet(State.INIT, State.STARTED)) {
throw new ISE("Cannot start from state[%s]", state);
}
final StageDefinition stageDef = workOrder.getStageDefinition();
try {
exec.registerCancellationId(cancellationId);
makeInputSliceReader();
makeWorkOutputChannelFactory();
makeShuffleOutputChannelFactory();
@ -205,16 +232,78 @@ public class RunWorkOrder
setUpCompletionCallbacks();
}
catch (Throwable t) {
// If start() has problems, cancel anything that was already kicked off, and close the FrameContext.
stopUnchecked();
}
}
/**
* Stops an execution that was previously initiated through {@link #startAsync()} and closes the {@link FrameContext}.
* May be called to cancel execution. Must also be called after successful execution in order to ensure that resources
* are all properly cleaned up.
*
* Blocks until execution is fully stopped.
*/
public void stop() throws InterruptedException
{
if (state.compareAndSet(State.INIT, State.STOPPING)
|| state.compareAndSet(State.STARTED, State.STOPPING)) {
// Initiate stopping.
Throwable e = null;
try {
exec.cancel(cancellationId);
}
catch (Throwable t2) {
t.addSuppressed(t2);
catch (Throwable e2) {
e = e2;
}
CloseableUtils.closeAndSuppressExceptions(frameContext, t::addSuppressed);
throw t;
try {
frameContext.close();
}
catch (Throwable e2) {
if (e == null) {
e = e2;
} else {
e.addSuppressed(e2);
}
}
try {
// notifyListener will ignore this cancellation error if work has already succeeded.
notifyListener(Either.error(new MSQException(CanceledFault.instance())));
}
catch (Throwable e2) {
if (e == null) {
e = e2;
} else {
e.addSuppressed(e2);
}
}
stopLatch.countDown();
if (e != null) {
Throwables.throwIfInstanceOf(e, InterruptedException.class);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
stopLatch.await();
}
/**
* Calls {@link #stop()}. If the call to {@link #stop()} throws {@link InterruptedException}, this method sets
* the interrupt flag and throws an unchecked exception.
*/
public void stopUnchecked()
{
try {
stop();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@ -459,19 +548,33 @@ public class RunWorkOrder
writeDurableStorageSuccessFile();
}
listener.onSuccess(resultObject);
notifyListener(Either.value(resultObject));
}
@Override
public void onFailure(final Throwable t)
{
listener.onFailure(t);
notifyListener(Either.error(t));
}
},
Execs.directExecutor()
);
}
/**
* Notify {@link RunWorkOrderListener} that the job is done, if not already notified.
*/
private void notifyListener(final Either<Throwable, Object> result)
{
if (resultForListener.compareAndSet(null, result)) {
if (result.isError()) {
listener.onFailure(result.error());
} else {
listener.onSuccess(result.valueOrThrow());
}
}
}
/**
* Write {@link DurableStorageUtils#SUCCESS_MARKER_FILENAME} for a particular stage, if durable storage is enabled.
*/
@ -561,7 +664,7 @@ public class RunWorkOrder
)
{
return DurableStorageOutputChannelFactory.createStandardImplementation(
controllerTaskId,
workerContext.queryId(),
workOrder.getWorkerNumber(),
workOrder.getStageNumber(),
workerContext.workerId(),

View File

@ -25,7 +25,7 @@ import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import javax.annotation.Nullable;
/**
* Listener for various things that may happen during execution of {@link RunWorkOrder#start()}. Listener methods are
* Listener for various things that may happen during execution of {@link RunWorkOrder#startAsync()}. Listener methods are
* fired in processing threads, so they must be thread-safe, and it is important that they run quickly.
*/
public interface RunWorkOrderListener

View File

@ -367,28 +367,19 @@ public class WorkerImpl implements Worker
final WorkerStageKernel kernel = kernelHolder.kernel;
final WorkOrder workOrder = kernel.getWorkOrder();
final StageDefinition stageDefinition = workOrder.getStageDefinition();
final String cancellationId = cancellationIdFor(stageDefinition.getId());
final String cancellationId = cancellationIdFor(stageDefinition.getId(), workOrder.getWorkerNumber());
log.info(
"Processing work order for stage[%s]%s",
"Starting work order for stage[%s], workerNumber[%d]%s",
stageDefinition.getId(),
workOrder.getWorkerNumber(),
(log.isDebugEnabled()
? StringUtils.format(", payload[%s]", context.jsonMapper().writeValueAsString(workOrder)) : "")
);
final FrameContext frameContext = kernelHolder.processorCloser.register(context.frameContext(workOrder));
kernelHolder.processorCloser.register(() -> {
try {
workerExec.cancel(cancellationId);
}
catch (InterruptedException e) {
// Strange that cancellation would itself be interrupted. Log and suppress.
log.warn(e, "Cancellation interrupted for stage[%s]", stageDefinition.getId());
Thread.currentThread().interrupt();
}
});
final FrameContext frameContext = context.frameContext(workOrder);
// Set up cleanup functions for this work order.
// Set up resultsCloser (called when we are done reading results).
kernelHolder.resultsCloser.register(() -> FileUtils.deleteDirectory(frameContext.tempDir()));
kernelHolder.resultsCloser.register(() -> removeStageOutputChannels(stageDefinition.getId()));
@ -397,13 +388,9 @@ public class WorkerImpl implements Worker
final InputChannelFactory inputChannelFactory =
makeBaseInputChannelFactory(workOrder, controllerClient, kernelHolder.processorCloser);
// Start working on this stage immediately.
kernel.startReading();
final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty();
final boolean includeAllCounters = context.includeAllCounters();
final RunWorkOrder runWorkOrder = new RunWorkOrder(
task.getControllerTaskId(),
workOrder,
inputChannelFactory,
stageCounters.computeIfAbsent(
@ -419,7 +406,12 @@ public class WorkerImpl implements Worker
MultiStageQueryContext.removeNullBytes(queryContext)
);
runWorkOrder.start();
// Set up processorCloser (called when processing is done).
kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked);
// Start working on this stage immediately.
kernel.startReading();
runWorkOrder.startAsync();
kernelHolder.partitionBoundariesFuture = runWorkOrder.getStagePartitionBoundariesFuture();
}
@ -987,9 +979,9 @@ public class WorkerImpl implements Worker
/**
* Returns cancellation ID for a particular stage, to be used in {@link FrameProcessorExecutor#cancel(String)}.
*/
private static String cancellationIdFor(final StageId stageId)
private static String cancellationIdFor(final StageId stageId, final int workerNumber)
{
return stageId.toString();
return StringUtils.format("%s_%s", stageId, workerNumber);
}
/**
@ -1244,10 +1236,19 @@ public class WorkerImpl implements Worker
private static class KernelHolder
{
private final WorkerStageKernel kernel;
private final Closer processorCloser;
private final Closer resultsCloser;
private SettableFuture<ClusterByPartitions> partitionBoundariesFuture;
/**
* Closer for processing. This is closed when all processing for a stage has completed.
*/
private final Closer processorCloser;
/**
* Closer for results. This is closed when results for a stage are no longer needed. Always closed
* *after* {@link #processorCloser} is done closing.
*/
private final Closer resultsCloser;
public KernelHolder(WorkerStageKernel kernel)
{
this.kernel = kernel;

View File

@ -20,6 +20,7 @@
package org.apache.druid.msq.querykit;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
@ -47,19 +48,21 @@ public class FrameProcessorTestBase extends InitializedNullHandlingTest
{
protected static final StagePartition STAGE_PARTITION = new StagePartition(new StageId("q", 0), 0);
private ListeningExecutorService innerExec;
protected FrameProcessorExecutor exec;
@Before
public void setUp()
{
exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
innerExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec"));
exec = new FrameProcessorExecutor(innerExec);
}
@After
public void tearDown() throws Exception
{
exec.getExecutorService().shutdownNow();
exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
innerExec.shutdownNow();
innerExec.awaitTermination(10, TimeUnit.MINUTES);
}
protected ReadableInput makeChannelFromCursorFactory(

View File

@ -46,12 +46,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@ -61,7 +63,6 @@ import java.util.stream.Collectors;
* If you want single threaded execution, use {@code Execs.singleThreaded()}. It is not a good idea to use this with a
* same-thread executor like {@code Execs.directExecutor()}, because it will lead to deep call stacks.
*/
@SuppressWarnings("CheckReturnValue")
public class FrameProcessorExecutor
{
private static final Logger log = new Logger(FrameProcessorExecutor.class);
@ -70,6 +71,10 @@ public class FrameProcessorExecutor
private final Object lock = new Object();
// Currently-active cancellationIds.
@GuardedBy("lock")
private final Set<String> activeCancellationIds = new HashSet<>();
// Futures that are active and therefore cancelable.
// Does not include return futures: those are in cancelableReturnFutures.
@GuardedBy("lock")
@ -96,19 +101,12 @@ public class FrameProcessorExecutor
this.exec = exec;
}
/**
* Returns the underlying executor service used by this executor.
*/
public ListeningExecutorService getExecutorService()
{
return exec;
}
/**
* Runs a processor until it is done, and returns a future that resolves when execution is complete.
*
* If "cancellationId" is provided, it can be used with the {@link #cancel(String)} method to cancel all processors
* currently running with the same cancellationId.
* If "cancellationId" is provided, it must have previously been registered with {@link #registerCancellationId}.
* Then, it can be used with the {@link #cancel(String)} method to cancel all processors with that
* same cancellationId.
*/
public <T> ListenableFuture<T> runFully(final FrameProcessor<T> processor, @Nullable final String cancellationId)
{
@ -116,6 +114,11 @@ public class FrameProcessorExecutor
final List<WritableFrameChannel> outputChannels = processor.outputChannels();
final SettableFuture<T> finished = registerCancelableFuture(SettableFuture.create(), true, cancellationId);
if (finished.isDone()) {
// Possibly due to starting life out being canceled.
return finished;
}
class ExecutorRunnable implements Runnable
{
private final AwaitAnyWidget awaitAnyWidget = new AwaitAnyWidget(inputChannels);
@ -152,7 +155,7 @@ public class FrameProcessorExecutor
final IntSet await = result.awaitSet();
if (await.isEmpty()) {
exec.submit(ExecutorRunnable.this);
exec.execute(ExecutorRunnable.this);
} else if (result.isAwaitAll() || await.size() == 1) {
final List<ListenableFuture<?>> readabilityFutures = new ArrayList<>();
@ -164,7 +167,7 @@ public class FrameProcessorExecutor
}
if (readabilityFutures.isEmpty()) {
exec.submit(ExecutorRunnable.this);
exec.execute(ExecutorRunnable.this);
} else {
runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures));
}
@ -272,7 +275,7 @@ public class FrameProcessorExecutor
public void onSuccess(final V ignored)
{
try {
exec.submit(ExecutorRunnable.this);
exec.execute(ExecutorRunnable.this);
}
catch (Throwable e) {
fail(e);
@ -390,7 +393,7 @@ public class FrameProcessorExecutor
logProcessorStatusString(processor, finished, null);
registerCancelableProcessor(processor, cancellationId);
exec.submit(runnable);
exec.execute(runnable);
return finished;
}
@ -423,8 +426,20 @@ public class FrameProcessorExecutor
}
/**
* Cancels all processors associated with a given cancellationId. Waits for the processors to exit before
* returning.
* Registers a cancellationId, so it can be provided to {@link #runFully} or {@link #runAllFully}. To avoid the
* set of active cancellationIds growing without bound, callers must also call {@link #cancel(String)} on the
* same cancellationId when done using it.
*/
public void registerCancellationId(final String cancellationId)
{
synchronized (lock) {
activeCancellationIds.add(cancellationId);
}
}
/**
* Deregisters a cancellationId and cancels any currently-running processors associated with that cancellationId.
* Waits for any canceled processors to exit before returning.
*/
public void cancel(final String cancellationId) throws InterruptedException
{
@ -435,6 +450,7 @@ public class FrameProcessorExecutor
final Set<ListenableFuture<?>> returnFuturesToCancel;
synchronized (lock) {
activeCancellationIds.remove(cancellationId);
futuresToCancel = cancelableFutures.removeAll(cancellationId);
processorsToCancel = cancelableProcessors.removeAll(cancellationId);
returnFuturesToCancel = cancelableReturnFutures.removeAll(cancellationId);
@ -457,6 +473,33 @@ public class FrameProcessorExecutor
}
}
/**
* Returns an {@link Executor} that executes using the same underlying service, and that is also connected to
* cancellation through {@link #cancel(String)}.
*
* @param cancellationId cancellation ID for the executor
*/
public Executor asExecutor(@Nullable final String cancellationId)
{
return command -> runFully(new RunnableFrameProcessor(command), cancellationId);
}
/**
* Shuts down the underlying executor service immediately.
*/
public void shutdownNow()
{
exec.shutdownNow();
}
/**
* Returns the underlying executor service used by this executor.
*/
ListeningExecutorService getExecutorService()
{
return exec;
}
/**
* Register a future that will be canceled when the provided {@code cancellationId} is canceled.
*
@ -472,6 +515,12 @@ public class FrameProcessorExecutor
{
if (cancellationId != null) {
synchronized (lock) {
if (!activeCancellationIds.contains(cancellationId)) {
// Cancel and return immediately.
future.cancel(true);
return future;
}
final SetMultimap<String, ListenableFuture<?>> map = isReturn ? cancelableReturnFutures : cancelableFutures;
map.put(cancellationId, future);
future.addListener(

View File

@ -306,9 +306,11 @@ public class RunAllFullyWidget<T, ResultType>
}
if (isDone) {
finished.compareAndSet(null, Either.value(processorManager.result()));
synchronized (runAllFullyLock) {
if (finished.get() == null) {
finished.compareAndSet(null, Either.value(processorManager.result()));
}
cleanupIfNoMoreProcessors();
}
} else {

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.processor;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import java.util.Collections;
import java.util.List;
/**
* Frame processor that simply runs a {@link Runnable}, once.
*/
public class RunnableFrameProcessor implements FrameProcessor<Void>
{
private final Runnable runnable;
public RunnableFrameProcessor(Runnable runnable)
{
this.runnable = runnable;
}
@Override
public List<ReadableFrameChannel> inputChannels()
{
return Collections.emptyList();
}
@Override
public List<WritableFrameChannel> outputChannels()
{
return Collections.emptyList();
}
@Override
public ReturnOrAwait<Void> runIncrementally(IntSet readableInputs)
{
runnable.run();
return ReturnOrAwait.returnObject(null);
}
@Override
public void cleanup()
{
// Nothing to do.
}
}

View File

@ -297,7 +297,7 @@ public class SuperSorter
setAllDoneIfPossible();
}
},
exec.getExecutorService()
exec.asExecutor(cancellationId)
);
return FutureUtils.futureWithBaggage(
@ -813,7 +813,7 @@ public class SuperSorter
},
// Must run in exec, instead of in the same thread, to avoid running callback immediately if the
// worker happens to finish super-quickly.
exec.getExecutorService()
exec.asExecutor(cancellationId)
);
}

View File

@ -222,6 +222,7 @@ public class FrameProcessorExecutorTest
final SettableFuture<Object> future = SettableFuture.create();
final String cancellationId = "xyzzy";
exec.registerCancellationId(cancellationId);
Assert.assertSame(future, exec.registerCancelableFuture(future, false, cancellationId));
exec.cancel(cancellationId);
@ -236,6 +237,8 @@ public class FrameProcessorExecutorTest
{
final SleepyFrameProcessor processor = new SleepyFrameProcessor();
final String cancellationId = "xyzzy";
exec.registerCancellationId(cancellationId);
final ListenableFuture<Long> future = exec.runFully(processor, cancellationId);
processor.awaitRun();
@ -254,6 +257,8 @@ public class FrameProcessorExecutorTest
{
final SleepyFrameProcessor processor = new SleepyFrameProcessor();
final String cancellationId = "xyzzy";
exec.registerCancellationId(cancellationId);
final ListenableFuture<Long> future = exec.runFully(processor, cancellationId);
processor.awaitRun();
@ -314,6 +319,8 @@ public class FrameProcessorExecutorTest
// Start up all systems at once.
for (final String systemId : systemGeneratorsMap.keySet()) {
exec.registerCancellationId(systemId);
for (InfiniteFrameProcessor generator : systemGeneratorsMap.get(systemId)) {
processorFutureMap.put(generator, exec.runFully(generator, systemId));
}
@ -391,6 +398,22 @@ public class FrameProcessorExecutorTest
// Just making sure no error is thrown when we refer to a nonexistent cancellationId.
exec.cancel("nonexistent");
}
@Test
public void test_runFully_nonexistentCancellationId()
{
final SleepyFrameProcessor processor = new SleepyFrameProcessor();
final String cancellationId = "xyzzy";
// Don't registerCancellationId(cancellationId).
final ListenableFuture<Long> future = exec.runFully(processor, cancellationId);
// Future should be immediately canceled, without running the processor.
Assert.assertTrue(future.isDone());
Assert.assertTrue(future.isCancelled());
Assert.assertFalse(processor.didGetInterrupt());
Assert.assertFalse(processor.didCleanup());
}
}
public abstract static class BaseFrameProcessorExecutorTestSuite extends InitializedNullHandlingTest

View File

@ -409,6 +409,8 @@ public class RunAllFullyWidgetTest extends FrameProcessorExecutorTest.BaseFrameP
.mapToObj(i -> new SleepyFrameProcessor())
.collect(Collectors.toList());
final String cancellationId = "xyzzy";
exec.registerCancellationId(cancellationId);
final ListenableFuture<Long> future = exec.runAllFully(
possiblyDelay(
ensureClose(
@ -418,7 +420,7 @@ public class RunAllFullyWidgetTest extends FrameProcessorExecutorTest.BaseFrameP
),
maxOutstandingProcessors,
bouncer,
"xyzzy"
cancellationId
);
for (int i = 0; i < expectedRunningProcessors; i++) {