From 2b23d0b5b52f1184e1642c5b5b15af4f4aa216ad Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 19 Mar 2024 19:25:48 -0700 Subject: [PATCH] MSQ: Controller checker should check for "closed" only. (#16161) * MSQ: Controller checker should check for "closed" only. Currently, the worker's controller checker will exit the worker if the controller location is "closed" (no longer running) or if its location is empty (i.e. location unknown). This patch changes to only exit on "closed". We shouldn't exit on empty location, because that may happen if the Overlord is slow to acknowledge the location of a task. * Fix test. --- .../msq/indexing/IndexerWorkerContext.java | 4 +++- .../msq/indexing/IndexerWorkerContextTest.java | 17 ++--------------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 53cd6e942ea..1bd789df769 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -190,7 +190,9 @@ public class IndexerWorkerContext implements WorkerContext break; } - if (controllerLocations.isClosed() || controllerLocations.getLocations().isEmpty()) { + // Note: don't exit on empty location, because that may happen if the Overlord is slow to acknowledge the + // location of a task. Only exit on "closed", because that happens only if the task is really no longer running. + if (controllerLocations.isClosed()) { log.warn( "Periodic fetch of controller location returned [%s]. Worker task [%s] will exit.", controllerLocations, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java index 2ae8d155d4d..583c21d3407 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java @@ -54,20 +54,6 @@ public class IndexerWorkerContextTest ); } - @Test - public void testControllerCheckerRunnableExitsWhenEmptyStatus() - { - final ServiceLocator controllerLocatorMock = Mockito.mock(ServiceLocator.class); - Mockito.when(controllerLocatorMock.locate()) - .thenReturn(Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet()))); - - final Worker workerMock = Mockito.mock(Worker.class); - - indexerWorkerContext.controllerCheckerRunnable(controllerLocatorMock, workerMock); - Mockito.verify(controllerLocatorMock, Mockito.times(1)).locate(); - Mockito.verify(workerMock, Mockito.times(1)).controllerFailed(); - } - @Test public void testControllerCheckerRunnableExitsOnlyWhenClosedStatus() { @@ -76,12 +62,13 @@ public class IndexerWorkerContextTest .thenReturn(Futures.immediateFuture(ServiceLocations.forLocation(new ServiceLocation("h", 1, -1, "/")))) // Done to check the behavior of the runnable, the situation of exiting after success might not occur actually .thenReturn(Futures.immediateFuture(ServiceLocations.forLocation(new ServiceLocation("h", 1, -1, "/")))) + .thenReturn(Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet()))) .thenReturn(Futures.immediateFuture(ServiceLocations.closed())); final Worker workerMock = Mockito.mock(Worker.class); indexerWorkerContext.controllerCheckerRunnable(controllerLocatorMock, workerMock); - Mockito.verify(controllerLocatorMock, Mockito.times(3)).locate(); + Mockito.verify(controllerLocatorMock, Mockito.times(4)).locate(); Mockito.verify(workerMock, Mockito.times(1)).controllerFailed(); } }