diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 6eb1a9c28df..65f24770806 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -31,6 +31,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -574,7 +575,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ); } - private void addWorker(final Worker worker) + @VisibleForTesting + void addWorker(final Worker worker) { synchronized (workers) { log.info("Worker[%s] reportin' for duty!", worker.getHost()); @@ -752,23 +754,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer log.debug("Running the Sync Monitoring."); try { - for (Map.Entry e : workers.entrySet()) { - WorkerHolder workerHolder = e.getValue(); - if (!workerHolder.getUnderlyingSyncer().isOK()) { - synchronized (workers) { - // check again that server is still there and only then reset. - if (workers.containsKey(e.getKey())) { - log.makeAlert( - "Worker[%s] is not syncing properly. Current state is [%s]. Resetting it.", - workerHolder.getWorker().getHost(), - workerHolder.getUnderlyingSyncer().getDebugInfo() - ).emit(); - removeWorker(workerHolder.getWorker()); - addWorker(workerHolder.getWorker()); - } - } - } - } + syncMonitoring(); } catch (Exception ex) { if (ex instanceof InterruptedException) { @@ -784,6 +770,30 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer ); } + @VisibleForTesting + void syncMonitoring() + { + // Ensure that the collection is not being modified during iteration. Iterate over a copy + final Set> workerEntrySet = ImmutableSet.copyOf(workers.entrySet()); + for (Map.Entry e : workerEntrySet) { + WorkerHolder workerHolder = e.getValue(); + if (!workerHolder.getUnderlyingSyncer().isOK()) { + synchronized (workers) { + // check again that server is still there and only then reset. + if (workers.containsKey(e.getKey())) { + log.makeAlert( + "Worker[%s] is not syncing properly. Current state is [%s]. Resetting it.", + workerHolder.getWorker().getHost(), + workerHolder.getUnderlyingSyncer().getDebugInfo() + ).emit(); + removeWorker(workerHolder.getWorker()); + addWorker(workerHolder.getWorker()); + } + } + } + } + } + /** * This method returns the debugging information exposed by {@link HttpRemoteTaskRunnerResource} and meant * for that use only. It must not be used for any other purpose. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index e78b78517ad..fe1b0ca498f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -58,6 +58,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordination.ChangeRequestHttpSyncer; import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -70,6 +71,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -1668,6 +1670,52 @@ public class HttpRemoteTaskRunnerTest } + @Test(timeout = 60_000L) + public void testSyncMonitoring_finiteIteration() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig(), + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + new NoopProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), + new NoopServiceEmitter() + ) + { + @Override + protected WorkerHolder createWorkerHolder( + ObjectMapper smileMapper, + HttpClient httpClient, + HttpRemoteTaskRunnerConfig config, + ScheduledExecutorService workersSyncExec, + WorkerHolder.Listener listener, + Worker worker, + List knownAnnouncements + ) + { + return createNonSyncingWorkerHolder(worker); + } + }; + + taskRunner.start(); + taskRunner.addWorker(createWorker("abc")); + taskRunner.addWorker(createWorker("xyz")); + taskRunner.addWorker(createWorker("lol")); + Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size()); + taskRunner.syncMonitoring(); + Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size()); + } + public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( TaskStorage taskStorage, List listenerNotificationsAccumulator @@ -1730,6 +1778,30 @@ public class HttpRemoteTaskRunnerTest return taskRunner; } + private Worker createWorker(String host) + { + Worker worker = EasyMock.createMock(Worker.class); + EasyMock.expect(worker.getHost()).andReturn(host).anyTimes(); + EasyMock.replay(worker); + return worker; + } + + private WorkerHolder createNonSyncingWorkerHolder(Worker worker) + { + ChangeRequestHttpSyncer syncer = EasyMock.createMock(ChangeRequestHttpSyncer.class); + EasyMock.expect(syncer.isOK()).andReturn(false).anyTimes(); + EasyMock.expect(syncer.getDebugInfo()).andReturn(Collections.emptyMap()).anyTimes(); + WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); + EasyMock.expect(workerHolder.getUnderlyingSyncer()).andReturn(syncer).anyTimes(); + EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes(); + workerHolder.start(); + EasyMock.expectLastCall(); + workerHolder.stop(); + EasyMock.expectLastCall(); + EasyMock.replay(syncer, workerHolder); + return workerHolder; + } + private static WorkerHolder createWorkerHolder( ObjectMapper smileMapper, HttpClient httpClient, diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index decab1f7cc7..893d455dc7a 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -21,11 +21,13 @@ package org.apache.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; import org.apache.druid.concurrent.LifecycleLock; @@ -58,6 +60,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -375,7 +378,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer ); } - private void serverAdded(DruidServer server) + @VisibleForTesting + void serverAdded(DruidServer server) { synchronized (servers) { DruidServerHolder holder = servers.get(server.getName()); @@ -430,31 +434,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer log.debug("Running the Sync Monitoring."); try { - for (Map.Entry e : servers.entrySet()) { - DruidServerHolder serverHolder = e.getValue(); - if (!serverHolder.syncer.isOK()) { - synchronized (servers) { - // check again that server is still there and only then reset. - if (servers.containsKey(e.getKey())) { - log.makeAlert( - "Server[%s] is not syncing properly. Current state is [%s]. Resetting it.", - serverHolder.druidServer.getName(), - serverHolder.syncer.getDebugInfo() - ).emit(); - serverRemoved(serverHolder.druidServer); - serverAdded(new DruidServer( - serverHolder.druidServer.getName(), - serverHolder.druidServer.getHostAndPort(), - serverHolder.druidServer.getHostAndTlsPort(), - serverHolder.druidServer.getMaxSize(), - serverHolder.druidServer.getType(), - serverHolder.druidServer.getTier(), - serverHolder.druidServer.getPriority() - )); - } - } - } - } + syncMonitoring(); } catch (Exception ex) { if (ex instanceof InterruptedException) { @@ -470,6 +450,38 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer ); } + @VisibleForTesting + void syncMonitoring() + { + // Ensure that the collection is not being modified during iteration. Iterate over a copy + final Set> serverEntrySet = ImmutableSet.copyOf(servers.entrySet()); + for (Map.Entry e : serverEntrySet) { + DruidServerHolder serverHolder = e.getValue(); + if (!serverHolder.syncer.isOK()) { + synchronized (servers) { + // check again that server is still there and only then reset. + if (servers.containsKey(e.getKey())) { + log.makeAlert( + "Server[%s] is not syncing properly. Current state is [%s]. Resetting it.", + serverHolder.druidServer.getName(), + serverHolder.syncer.getDebugInfo() + ).emit(); + serverRemoved(serverHolder.druidServer); + serverAdded(new DruidServer( + serverHolder.druidServer.getName(), + serverHolder.druidServer.getHostAndPort(), + serverHolder.druidServer.getHostAndTlsPort(), + serverHolder.druidServer.getMaxSize(), + serverHolder.druidServer.getType(), + serverHolder.druidServer.getTier(), + serverHolder.druidServer.getPriority() + )); + } + } + } + } + } + @Override public boolean isStarted() { diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java index 896f90d67dd..b08db90c2ca 100644 --- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java @@ -273,6 +273,50 @@ public class HttpServerInventoryViewTest httpServerInventoryView.stop(); } + @Test(timeout = 60_000L) + public void testSyncMonitoring() + { + ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + TestHttpClient httpClient = new TestHttpClient(ImmutableList.of()); + + HttpServerInventoryView httpServerInventoryView = new HttpServerInventoryView( + jsonMapper, + httpClient, + druidNodeDiscoveryProvider, + (pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"), + new HttpServerInventoryViewConfig(null, null, null), + "test" + ); + + httpServerInventoryView.start(); + httpServerInventoryView.serverAdded(makeServer("abc.com:8080")); + httpServerInventoryView.serverAdded(makeServer("xyz.com:8080")); + httpServerInventoryView.serverAdded(makeServer("lol.com:8080")); + Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size()); + httpServerInventoryView.syncMonitoring(); + Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size()); + } + + private DruidServer makeServer(String host) + { + return new DruidServer( + host, + host, + host, + 100_000_000L, + ServerType.HISTORICAL, + "__default_tier", + 50 + ); + } + private static class TestDruidNodeDiscovery implements DruidNodeDiscovery { Listener listener;