mirror of https://github.com/apache/druid.git
MSQ: Cancel workers more quickly. (#16158)
Prior to this patch, when canceled, workers would keep trying to contact the controller: they would attempt to report an error, and if they were in the midst of some other call (like a counters push) they would keep trying it. This can cause cancellation to be delayed, because the controller shuts down its HTTP server before it cancels workers. Workers are then stuck retrying calls to the controller that will never succeed. The retry loops are broken when the controller gives up on them (one minute later) and exits for real. Then, the controller failure detection logic on the worker detects that the controller has failed, and the worker finally shuts down. This patch speeds up worker cancellation by bypassing communication with the controller. There is no real need for it. If the controller canceled the workers, it isn't interested in further communications from them. If the workers were canceled out-of-band, the controller can detect this through worker monitoring and report it as a WorkerFailed error.
This commit is contained in:
parent
5afd5c41a5
commit
8ee324c7e7
|
@ -73,8 +73,12 @@ public interface ControllerClient extends AutoCloseable
|
|||
void postWorkerWarning(
|
||||
List<MSQErrorReport> MSQErrorReports
|
||||
) throws IOException;
|
||||
|
||||
List<String> getTaskList() throws IOException;
|
||||
|
||||
/**
|
||||
* Close this client. Idempotent.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
|
|
|
@ -498,20 +498,16 @@ public class WorkerImpl implements Worker
|
|||
@Override
|
||||
public void stopGracefully()
|
||||
{
|
||||
log.info("Stopping gracefully for taskId [%s]", task.getId());
|
||||
kernelManipulationQueue.add(
|
||||
kernel -> {
|
||||
// stopGracefully() is called when the containing process is terminated, or when the task is canceled.
|
||||
throw new MSQException(CanceledFault.INSTANCE);
|
||||
}
|
||||
);
|
||||
// stopGracefully() is called when the containing process is terminated, or when the task is canceled.
|
||||
log.info("Worker task[%s] canceled.", task.getId());
|
||||
doCancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void controllerFailed()
|
||||
{
|
||||
controllerAlive = false;
|
||||
stopGracefully();
|
||||
log.info("Controller task[%s] for worker task[%s] failed. Canceling.", task.getControllerTaskId(), task.getId());
|
||||
doCancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -909,6 +905,31 @@ public class WorkerImpl implements Worker
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by {@link #stopGracefully()} (task canceled, or containing process shut down) and
|
||||
* {@link #controllerFailed()}.
|
||||
*/
|
||||
private void doCancel()
|
||||
{
|
||||
// Set controllerAlive = false so we don't try to contact the controller after being canceled. If it canceled us,
|
||||
// it doesn't need to know that we were canceled. If we were canceled by something else, the controller will
|
||||
// detect this as part of its monitoring of workers.
|
||||
controllerAlive = false;
|
||||
|
||||
// Close controller client to cancel any currently in-flight calls to the controller.
|
||||
if (controllerClient != null) {
|
||||
controllerClient.close();
|
||||
}
|
||||
|
||||
// Clear the main loop event queue, then throw a CanceledFault into the loop to exit it promptly.
|
||||
kernelManipulationQueue.clear();
|
||||
kernelManipulationQueue.add(
|
||||
kernel -> {
|
||||
throw new MSQException(CanceledFault.INSTANCE);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Log (at DEBUG level) a string explaining the status of all work assigned to this worker.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue