diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java index b3675f0e047..afd1ece4dad 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java @@ -73,8 +73,12 @@ public interface ControllerClient extends AutoCloseable void postWorkerWarning( List MSQErrorReports ) throws IOException; + List getTaskList() throws IOException; + /** + * Close this client. Idempotent. + */ @Override void close(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 3f2ef39b5bf..e76169f7042 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -498,20 +498,16 @@ public class WorkerImpl implements Worker @Override public void stopGracefully() { - log.info("Stopping gracefully for taskId [%s]", task.getId()); - kernelManipulationQueue.add( - kernel -> { - // stopGracefully() is called when the containing process is terminated, or when the task is canceled. - throw new MSQException(CanceledFault.INSTANCE); - } - ); + // stopGracefully() is called when the containing process is terminated, or when the task is canceled. + log.info("Worker task[%s] canceled.", task.getId()); + doCancel(); } @Override public void controllerFailed() { - controllerAlive = false; - stopGracefully(); + log.info("Controller task[%s] for worker task[%s] failed. Canceling.", task.getControllerTaskId(), task.getId()); + doCancel(); } @Override @@ -909,6 +905,31 @@ public class WorkerImpl implements Worker } } + /** + * Called by {@link #stopGracefully()} (task canceled, or containing process shut down) and + * {@link #controllerFailed()}. + */ + private void doCancel() + { + // Set controllerAlive = false so we don't try to contact the controller after being canceled. If it canceled us, + // it doesn't need to know that we were canceled. If we were canceled by something else, the controller will + // detect this as part of its monitoring of workers. + controllerAlive = false; + + // Close controller client to cancel any currently in-flight calls to the controller. + if (controllerClient != null) { + controllerClient.close(); + } + + // Clear the main loop event queue, then throw a CanceledFault into the loop to exit it promptly. + kernelManipulationQueue.clear(); + kernelManipulationQueue.add( + kernel -> { + throw new MSQException(CanceledFault.INSTANCE); + } + ); + } + /** * Log (at DEBUG level) a string explaining the status of all work assigned to this worker. */