From b8f7128b2d8f340d40ddabff7fea3282e82fa606 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 14 Apr 2020 20:42:56 -0700 Subject: [PATCH] Revert "remove ServerDiscoverySelector from DruidLeaderClient (#9481)" (#9702) * Revert "remove ServerDiscoverySelector from DruidLeaderClient (#9481)" This reverts commit 072bbe210f162228e85391b464f114da448df24a. * fix build --- .../druid/discovery/DruidLeaderClient.java | 19 ++++++++++++++++- .../guice/CoordinatorDiscoveryModule.java | 19 +++++++++++++++-- .../guice/IndexingServiceDiscoveryModule.java | 19 +++++++++++++++-- .../discovery/DruidLeaderClientTest.java | 21 +++++++++++++------ .../coordinator/duty/CompactSegmentsTest.java | 2 +- .../druid/sql/calcite/util/CalciteTests.java | 5 ++++- 6 files changed, 72 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java index ba693e7154f..95b4fa6a32c 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java @@ -22,6 +22,8 @@ package org.apache.druid.discovery; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.selector.DiscoverySelector; +import org.apache.druid.client.selector.Server; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; @@ -70,6 +72,9 @@ public class DruidLeaderClient private final String leaderRequestPath; + //Note: This is kept for back compatibility with pre 0.11.0 releases and should be removed in future. + private final DiscoverySelector serverDiscoverySelector; + private LifecycleLock lifecycleLock = new LifecycleLock(); private DruidNodeDiscovery druidNodeDiscovery; private AtomicReference currentKnownLeader = new AtomicReference<>(); @@ -78,13 +83,15 @@ public class DruidLeaderClient HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, NodeRole nodeRoleToWatch, - String leaderRequestPath + String leaderRequestPath, + DiscoverySelector serverDiscoverySelector ) { this.httpClient = httpClient; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.nodeRoleToWatch = nodeRoleToWatch; this.leaderRequestPath = leaderRequestPath; + this.serverDiscoverySelector = serverDiscoverySelector; } @LifecycleStart @@ -296,6 +303,16 @@ public class DruidLeaderClient @Nullable private String pickOneHost() { + Server server = serverDiscoverySelector.pick(); + if (server != null) { + return StringUtils.format( + "%s://%s:%s", + server.getScheme(), + server.getAddress(), + server.getPort() + ); + } + Iterator iter = druidNodeDiscovery.getAllNodes().iterator(); if (iter.hasNext()) { DiscoveryDruidNode node = iter.next(); diff --git a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java index d9f083767df..b90a9b5e76f 100644 --- a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java @@ -24,6 +24,8 @@ import com.google.inject.Module; import com.google.inject.Provides; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.coordinator.CoordinatorSelectorConfig; +import org.apache.druid.curator.discovery.ServerDiscoveryFactory; +import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; @@ -40,19 +42,32 @@ public class CoordinatorDiscoveryModule implements Module JsonConfigProvider.bind(binder, "druid.selectors.coordinator", CoordinatorSelectorConfig.class); } + @Provides + @Coordinator + @ManageLifecycle + public ServerDiscoverySelector getServiceProvider( + CoordinatorSelectorConfig config, + ServerDiscoveryFactory serverDiscoveryFactory + ) + { + return serverDiscoveryFactory.createSelector(config.getServiceName()); + } + @Provides @Coordinator @ManageLifecycle public DruidLeaderClient getLeaderHttpClient( @EscalatedGlobal HttpClient httpClient, - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + @Coordinator ServerDiscoverySelector serverDiscoverySelector ) { return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, NodeRole.COORDINATOR, - "/druid/coordinator/v1/leader" + "/druid/coordinator/v1/leader", + serverDiscoverySelector ); } } diff --git a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java index 3e480781391..3c4f63c5404 100644 --- a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java @@ -24,6 +24,8 @@ import com.google.inject.Module; import com.google.inject.Provides; import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.client.indexing.IndexingServiceSelectorConfig; +import org.apache.druid.curator.discovery.ServerDiscoveryFactory; +import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; @@ -40,19 +42,32 @@ public class IndexingServiceDiscoveryModule implements Module JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class); } + @Provides + @IndexingService + @ManageLifecycle + public ServerDiscoverySelector getServiceProvider( + IndexingServiceSelectorConfig config, + ServerDiscoveryFactory serverDiscoveryFactory + ) + { + return serverDiscoveryFactory.createSelector(config.getServiceName()); + } + @Provides @IndexingService @ManageLifecycle public DruidLeaderClient getLeaderHttpClient( @EscalatedGlobal HttpClient httpClient, - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + @IndexingService ServerDiscoverySelector serverDiscoverySelector ) { return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, NodeRole.OVERLORD, - "/druid/indexer/v1/leader" + "/druid/indexer/v1/leader", + serverDiscoverySelector ); } } diff --git a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java index 570720db48f..7c376ea20b6 100644 --- a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java @@ -29,6 +29,7 @@ import com.google.inject.Module; import com.google.inject.name.Named; import com.google.inject.name.Names; import com.google.inject.servlet.GuiceFilter; +import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; @@ -122,7 +123,8 @@ public class DruidLeaderClientTest extends BaseJettyTest httpClient, druidNodeDiscoveryProvider, NodeRole.PEON, - "/simple/leader" + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) ); druidLeaderClient.start(); @@ -146,7 +148,8 @@ public class DruidLeaderClientTest extends BaseJettyTest httpClient, druidNodeDiscoveryProvider, NodeRole.PEON, - "/simple/leader" + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) ); druidLeaderClient.start(); @@ -172,7 +175,8 @@ public class DruidLeaderClientTest extends BaseJettyTest httpClient, druidNodeDiscoveryProvider, NodeRole.PEON, - "/simple/leader" + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) ); druidLeaderClient.start(); @@ -184,6 +188,9 @@ public class DruidLeaderClientTest extends BaseJettyTest @Test public void testServerFailureAndRedirect() throws Exception { + ServerDiscoverySelector serverDiscoverySelector = EasyMock.createMock(ServerDiscoverySelector.class); + EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes(); + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); DiscoveryDruidNode dummyNode = new DiscoveryDruidNode( new DruidNode("test", "dummyhost", false, 64231, null, true, false), @@ -196,13 +203,14 @@ public class DruidLeaderClientTest extends BaseJettyTest DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery).anyTimes(); - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, NodeRole.PEON, - "/simple/leader" + "/simple/leader", + serverDiscoverySelector ); druidLeaderClient.start(); @@ -228,7 +236,8 @@ public class DruidLeaderClientTest extends BaseJettyTest httpClient, druidNodeDiscoveryProvider, NodeRole.PEON, - "/simple/leader" + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) ); druidLeaderClient.start(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 683e776fc51..43c38556972 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -360,7 +360,7 @@ public class CompactSegmentsTest private TestDruidLeaderClient(ObjectMapper jsonMapper) { - super(null, new TestNodeDiscoveryProvider(), null, null); + super(null, new TestNodeDiscoveryProvider(), null, null, null); this.jsonMapper = jsonMapper; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 2792b2d30c1..08742812f89 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -773,7 +773,10 @@ public class CalciteTests new FakeHttpClient(), provider, NodeRole.COORDINATOR, - "/simple/leader" + "/simple/leader", + () -> { + throw new UnsupportedOperationException(); + } ); return new SystemSchema(