From 8ee324c7e73ef6c64c55f0b2b93bf3e198528234 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 19 Mar 2024 01:51:22 -0700 Subject: [PATCH] MSQ: Cancel workers more quickly. (#16158) Prior to this patch, when canceled, workers would keep trying to contact the controller: they would attempt to report an error, and if they were in the midst of some other call (like a counters push) they would keep trying it. This can cause cancellation to be delayed, because the controller shuts down its HTTP server before it cancels workers. Workers are then stuck retrying calls to the controller that will never succeed. The retry loops are broken when the controller gives up on them (one minute later) and exits for real. Then, the controller failure detection logic on the worker detects that the controller has failed, and the worker finally shuts down. This patch speeds up worker cancellation by bypassing communication with the controller. There is no real need for it. If the controller canceled the workers, it isn't interested in further communications from them. If the workers were canceled out-of-band, the controller can detect this through worker monitoring and report it as a WorkerFailed error. --- .../druid/msq/exec/ControllerClient.java | 4 ++ .../org/apache/druid/msq/exec/WorkerImpl.java | 39 ++++++++++++++----- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java index b3675f0e047..afd1ece4dad 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java @@ -73,8 +73,12 @@ public interface ControllerClient extends AutoCloseable void postWorkerWarning( List MSQErrorReports ) throws IOException; + List getTaskList() throws IOException; + /** + * Close this client. Idempotent. + */ @Override void close(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 3f2ef39b5bf..e76169f7042 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -498,20 +498,16 @@ public class WorkerImpl implements Worker @Override public void stopGracefully() { - log.info("Stopping gracefully for taskId [%s]", task.getId()); - kernelManipulationQueue.add( - kernel -> { - // stopGracefully() is called when the containing process is terminated, or when the task is canceled. - throw new MSQException(CanceledFault.INSTANCE); - } - ); + // stopGracefully() is called when the containing process is terminated, or when the task is canceled. + log.info("Worker task[%s] canceled.", task.getId()); + doCancel(); } @Override public void controllerFailed() { - controllerAlive = false; - stopGracefully(); + log.info("Controller task[%s] for worker task[%s] failed. Canceling.", task.getControllerTaskId(), task.getId()); + doCancel(); } @Override @@ -909,6 +905,31 @@ public class WorkerImpl implements Worker } } + /** + * Called by {@link #stopGracefully()} (task canceled, or containing process shut down) and + * {@link #controllerFailed()}. + */ + private void doCancel() + { + // Set controllerAlive = false so we don't try to contact the controller after being canceled. If it canceled us, + // it doesn't need to know that we were canceled. If we were canceled by something else, the controller will + // detect this as part of its monitoring of workers. + controllerAlive = false; + + // Close controller client to cancel any currently in-flight calls to the controller. + if (controllerClient != null) { + controllerClient.close(); + } + + // Clear the main loop event queue, then throw a CanceledFault into the loop to exit it promptly. + kernelManipulationQueue.clear(); + kernelManipulationQueue.add( + kernel -> { + throw new MSQException(CanceledFault.INSTANCE); + } + ); + } + /** * Log (at DEBUG level) a string explaining the status of all work assigned to this worker. */