diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 53cd6e942ea..1bd789df769 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -190,7 +190,9 @@ public class IndexerWorkerContext implements WorkerContext break; } - if (controllerLocations.isClosed() || controllerLocations.getLocations().isEmpty()) { + // Note: don't exit on empty location, because that may happen if the Overlord is slow to acknowledge the + // location of a task. Only exit on "closed", because that happens only if the task is really no longer running. + if (controllerLocations.isClosed()) { log.warn( "Periodic fetch of controller location returned [%s]. Worker task [%s] will exit.", controllerLocations, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java index 2ae8d155d4d..583c21d3407 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java @@ -54,20 +54,6 @@ public class IndexerWorkerContextTest ); } - @Test - public void testControllerCheckerRunnableExitsWhenEmptyStatus() - { - final ServiceLocator controllerLocatorMock = Mockito.mock(ServiceLocator.class); - Mockito.when(controllerLocatorMock.locate()) - .thenReturn(Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet()))); - - final Worker workerMock = Mockito.mock(Worker.class); - - indexerWorkerContext.controllerCheckerRunnable(controllerLocatorMock, workerMock); - Mockito.verify(controllerLocatorMock, Mockito.times(1)).locate(); - Mockito.verify(workerMock, Mockito.times(1)).controllerFailed(); - } - @Test public void testControllerCheckerRunnableExitsOnlyWhenClosedStatus() { @@ -76,12 +62,13 @@ public class IndexerWorkerContextTest .thenReturn(Futures.immediateFuture(ServiceLocations.forLocation(new ServiceLocation("h", 1, -1, "/")))) // Done to check the behavior of the runnable, the situation of exiting after success might not occur actually .thenReturn(Futures.immediateFuture(ServiceLocations.forLocation(new ServiceLocation("h", 1, -1, "/")))) + .thenReturn(Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet()))) .thenReturn(Futures.immediateFuture(ServiceLocations.closed())); final Worker workerMock = Mockito.mock(Worker.class); indexerWorkerContext.controllerCheckerRunnable(controllerLocatorMock, workerMock); - Mockito.verify(controllerLocatorMock, Mockito.times(3)).locate(); + Mockito.verify(controllerLocatorMock, Mockito.times(4)).locate(); Mockito.verify(workerMock, Mockito.times(1)).controllerFailed(); } }