diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java index 73a940f05ea..fcfc2be4e50 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -57,13 +58,17 @@ public class CommonCacheNotifier { private static final EmittingLogger LOG = new EmittingLogger(CommonCacheNotifier.class); - private static final List NODE_TYPES = Arrays.asList( - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, - DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, - DruidNodeDiscoveryProvider.NODE_TYPE_MM + /** + * {@link NodeType#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly + * from metadata storage. + */ + private static final List NODE_TYPES = Arrays.asList( + NodeType.BROKER, + NodeType.OVERLORD, + NodeType.HISTORICAL, + NodeType.PEON, + NodeType.ROUTER, + NodeType.MIDDLE_MANAGER ); private final DruidNodeDiscoveryProvider discoveryProvider; @@ -154,7 +159,7 @@ public class CommonCacheNotifier private List> sendUpdate(String updatedAuthorizerPrefix, byte[] serializedUserMap) { List> futures = new ArrayList<>(); - for (String nodeType : NODE_TYPES) { + for (NodeType nodeType : NODE_TYPES) { DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeType(nodeType); Collection nodes = nodeDiscovery.getAllNodes(); for (DiscoveryDruidNode node : nodes) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index e815cfa3016..34eca06c626 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -40,8 +40,8 @@ import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -310,7 +310,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask new LookupNodeService(lookupTier); DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( toolbox.getDruidNode(), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index aa50e687d43..88dfe70ef7b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -31,8 +31,8 @@ import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -255,7 +255,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner new LookupNodeService(lookupTier); DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( toolbox.getDruidNode(), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index bc6af54f00a..b7ab3a0ff77 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -37,8 +37,8 @@ import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; @@ -672,7 +672,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements new LookupNodeService(getContextValue(CTX_KEY_LOOKUP_TIER)); return new DiscoveryDruidNode( toolbox.getDruidNode(), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index 6a44c04499d..83130e6c6ab 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -32,8 +32,8 @@ import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -364,7 +364,7 @@ public class RealtimeIndexTask extends AbstractTask new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER)); DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( toolbox.getDruidNode(), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 2047d53633f..b3c6f77985c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -41,6 +41,7 @@ import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; @@ -445,7 +446,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer private void startWorkersHandling() throws InterruptedException { final CountDownLatch workerViewInitialized = new CountDownLatch(1); - DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM); + DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER); druidNodeDiscovery.registerListener( new DruidNodeDiscovery.Listener() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 3f7b5c3c6f2..86194f13dc6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -33,6 +33,7 @@ import org.apache.druid.common.guava.DSuppliers; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; @@ -84,7 +85,7 @@ public class HttpRemoteTaskRunnerTest { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -129,7 +130,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode( new DruidNode("service", "host1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -137,7 +138,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0") ) @@ -172,7 +173,7 @@ public class HttpRemoteTaskRunnerTest { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -221,7 +222,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode( new DruidNode("service", "host1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -229,7 +230,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0") ) @@ -259,7 +260,7 @@ public class HttpRemoteTaskRunnerTest { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -324,7 +325,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", 1234, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -404,7 +405,7 @@ public class HttpRemoteTaskRunnerTest { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -458,7 +459,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", 1234, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -567,7 +568,7 @@ public class HttpRemoteTaskRunnerTest { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -621,7 +622,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", 1234, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -730,7 +731,7 @@ public class HttpRemoteTaskRunnerTest { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -788,7 +789,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode( new DruidNode("service", "host1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0") ) @@ -832,7 +833,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0") ) @@ -865,7 +866,7 @@ public class HttpRemoteTaskRunnerTest DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( new DruidNode("service", "host3", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0") ) @@ -1159,7 +1160,7 @@ public class HttpRemoteTaskRunnerTest { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java index 70102fd67dc..eaa5a36e0fa 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java @@ -42,11 +42,7 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer private final ObjectMapper jsonMapper; @Inject - public CuratorDruidNodeAnnouncer( - Announcer announcer, - ZkPathsConfig config, - @Json ObjectMapper jsonMapper - ) + public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper) { this.announcer = announcer; this.config = config; @@ -59,14 +55,12 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer try { log.info("Announcing [%s].", discoveryDruidNode); - announcer.announce( - ZKPaths.makePath( - config.getInternalDiscoveryPath(), - discoveryDruidNode.getNodeType(), - discoveryDruidNode.getDruidNode().getHostAndPortToUse() - ), - jsonMapper.writeValueAsBytes(discoveryDruidNode) + String path = ZKPaths.makePath( + config.getInternalDiscoveryPath(), + discoveryDruidNode.getNodeType().toString(), + discoveryDruidNode.getDruidNode().getHostAndPortToUse() ); + announcer.announce(path, jsonMapper.writeValueAsBytes(discoveryDruidNode)); log.info("Announced [%s].", discoveryDruidNode); } @@ -80,13 +74,12 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer { log.info("Unannouncing [%s].", discoveryDruidNode); - announcer.unannounce( - ZKPaths.makePath( - config.getInternalDiscoveryPath(), - discoveryDruidNode.getNodeType(), - discoveryDruidNode.getDruidNode().getHostAndPortToUse() - ) + String path = ZKPaths.makePath( + config.getInternalDiscoveryPath(), + discoveryDruidNode.getNodeType().toString(), + discoveryDruidNode.getDruidNode().getHostAndPortToUse() ); + announcer.unannounce(path); log.info("Unannounced [%s].", discoveryDruidNode); } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index 212d27d9f99..46b8ce912df 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -32,6 +32,7 @@ import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.ISE; @@ -65,7 +66,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide private ExecutorService listenerExecutor; - private final Map nodeTypeWatchers = new ConcurrentHashMap<>(); + private final Map nodeTypeWatchers = new ConcurrentHashMap<>(); private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -82,27 +83,23 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide } @Override - public DruidNodeDiscovery getForNodeType(String nodeType) + public DruidNodeDiscovery getForNodeType(NodeType nodeType) { Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); - return nodeTypeWatchers.compute( + return nodeTypeWatchers.computeIfAbsent( nodeType, - (k, v) -> { - if (v != null) { - return v; - } - - log.info("Creating NodeTypeWatcher for nodeType [%s].", nodeType); + nType -> { + log.info("Creating NodeTypeWatcher for nodeType [%s].", nType); NodeTypeWatcher nodeTypeWatcher = new NodeTypeWatcher( listenerExecutor, curatorFramework, config.getInternalDiscoveryPath(), jsonMapper, - nodeType + nType ); nodeTypeWatcher.start(); - log.info("Created NodeTypeWatcher for nodeType [%s].", nodeType); + log.info("Created NodeTypeWatcher for nodeType [%s].", nType); return nodeTypeWatcher; } ); @@ -154,7 +151,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide private final CuratorFramework curatorFramework; - private final String nodeType; + private final NodeType nodeType; private final ObjectMapper jsonMapper; // hostAndPort -> DiscoveryDruidNode @@ -165,7 +162,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide private final ExecutorService listenerExecutor; - private final List nodeListeners = new ArrayList(); + private final List nodeListeners = new ArrayList<>(); private final Object lock = new Object(); @@ -176,7 +173,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide CuratorFramework curatorFramework, String basePath, ObjectMapper jsonMapper, - String nodeType + NodeType nodeType ) { this.listenerExecutor = listenerExecutor; @@ -188,7 +185,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeTypeWatcher[%s]", nodeType)); this.cache = new PathChildrenCache( curatorFramework, - ZKPaths.makePath(basePath, nodeType), + ZKPaths.makePath(basePath, nodeType.toString()), true, true, cacheExecutor @@ -241,10 +238,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide return; } - DiscoveryDruidNode druidNode = jsonMapper.readValue( - data, - DiscoveryDruidNode.class - ); + DiscoveryDruidNode druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class); if (!nodeType.equals(druidNode.getNodeType())) { log.warn( @@ -255,11 +249,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide return; } - log.info( - "Node[%s:%s] appeared.", - druidNode.getDruidNode().getHostAndPortToUse(), - druidNode - ); + log.info("Node[%s:%s] appeared.", druidNode.getDruidNode().getHostAndPortToUse(), druidNode); addNode(druidNode); @@ -330,10 +320,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide } } - private void safeSchedule( - Runnable runnable, - String errMsgFormat, Object... args - ) + private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args) { listenerExecutor.submit(() -> { try { diff --git a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java index fbb124afd2b..dd843397b1f 100644 --- a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java +++ b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java @@ -36,7 +36,7 @@ import java.util.Objects; public class DiscoveryDruidNode { private final DruidNode druidNode; - private final String nodeType; + private final NodeType nodeType; // Other metadata associated with the node e.g. // if its a historical node then lookup information, segment loading capacity etc. @@ -45,7 +45,7 @@ public class DiscoveryDruidNode @JsonCreator public DiscoveryDruidNode( @JsonProperty("druidNode") DruidNode druidNode, - @JsonProperty("nodeType") String nodeType, + @JsonProperty("nodeType") NodeType nodeType, @JsonProperty("services") Map services ) { @@ -64,7 +64,7 @@ public class DiscoveryDruidNode } @JsonProperty - public String getNodeType() + public NodeType getNodeType() { return nodeType; } diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java index 72feb9cdbbd..f0563a40b00 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java @@ -67,7 +67,7 @@ public class DruidLeaderClient private final HttpClient httpClient; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; - private final String nodeTypeToWatch; + private final NodeType nodeTypeToWatch; private final String leaderRequestPath; @@ -81,7 +81,7 @@ public class DruidLeaderClient public DruidLeaderClient( HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, - String nodeTypeToWatch, + NodeType nodeTypeToWatch, String leaderRequestPath, ServerDiscoverySelector serverDiscoverySelector ) diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java index dcd30ccb5e6..9ae34a3c19a 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java @@ -35,63 +35,40 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** - * Provider of DruidNodeDiscovery instances. + * Provider of {@link DruidNodeDiscovery} instances. */ public abstract class DruidNodeDiscoveryProvider { - private static final Logger log = new Logger(DruidNodeDiscoveryProvider.class); - - public static final String NODE_TYPE_COORDINATOR = "coordinator"; - public static final String NODE_TYPE_HISTORICAL = "historical"; - public static final String NODE_TYPE_BROKER = "broker"; - public static final String NODE_TYPE_OVERLORD = "overlord"; - public static final String NODE_TYPE_PEON = "peon"; - public static final String NODE_TYPE_ROUTER = "router"; - public static final String NODE_TYPE_MM = "middleManager"; - - public static final Set ALL_NODE_TYPES = ImmutableSet.of( - NODE_TYPE_COORDINATOR, - NODE_TYPE_HISTORICAL, - NODE_TYPE_BROKER, - NODE_TYPE_OVERLORD, - NODE_TYPE_PEON, - NODE_TYPE_ROUTER, - NODE_TYPE_MM + private static final Map> SERVICE_TO_NODE_TYPES = ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.BROKER, NodeType.HISTORICAL, NodeType.PEON), + DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.HISTORICAL, NodeType.PEON), + WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.PEON) ); - private static final Map> SERVICE_TO_NODE_TYPES = ImmutableMap.of( - LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_BROKER, NODE_TYPE_HISTORICAL, NODE_TYPE_PEON), - DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_HISTORICAL, NODE_TYPE_PEON), - WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_MM) - ); - - private final ConcurrentHashMap serviceDiscoveryMap = new ConcurrentHashMap<>( - SERVICE_TO_NODE_TYPES.size()); + private final ConcurrentHashMap serviceDiscoveryMap = + new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size()); /** * Get DruidNodeDiscovery instance to discover nodes of given nodeType. */ - public abstract DruidNodeDiscovery getForNodeType(String nodeType); + public abstract DruidNodeDiscovery getForNodeType(NodeType nodeType); /** * Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata. */ public DruidNodeDiscovery getForService(String serviceName) { - return serviceDiscoveryMap.compute( + return serviceDiscoveryMap.computeIfAbsent( serviceName, - (k, v) -> { - if (v != null) { - return v; - } + service -> { - Set nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName); + Set nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(service); if (nodeTypesToWatch == null) { - throw new IAE("Unknown service [%s].", serviceName); + throw new IAE("Unknown service [%s].", service); } - ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(serviceName); - for (String nodeType : nodeTypesToWatch) { + ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service); + for (NodeType nodeType : nodeTypesToWatch) { getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener()); } return serviceDiscovery; diff --git a/server/src/main/java/org/apache/druid/discovery/NodeType.java b/server/src/main/java/org/apache/druid/discovery/NodeType.java new file mode 100644 index 00000000000..841656bf1fc --- /dev/null +++ b/server/src/main/java/org/apache/druid/discovery/NodeType.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.discovery; + +import com.fasterxml.jackson.annotation.JsonValue; + +/** + * + * This is a historical occasion that this enum is different from {@link + * org.apache.druid.server.coordination.ServerType} because they are essentially the same abstraction, but merging them + * could only increase the complexity and drop the code safety, because they name the same types differently ("peon" - + * "indexer-executor" and "middleManager" - "realtime") and both expose them via JSON APIs. + */ +public enum NodeType +{ + COORDINATOR("coordinator"), + HISTORICAL("historical"), + BROKER("broker"), + OVERLORD("overlord"), + PEON("peon"), + ROUTER("router"), + MIDDLE_MANAGER("middleManager"); + + private final String jsonName; + + NodeType(String jsonName) + { + this.jsonName = jsonName; + } + + /** + * Lowercase for backward compatibility, as a part of the {@link DiscoveryDruidNode}'s JSON format. + * + * Don't need to define {@link com.fasterxml.jackson.annotation.JsonCreator} because for enum types {@link JsonValue} + * serves for both serialization and deserialization, see the Javadoc comment of {@link JsonValue}. + */ + @JsonValue + public String getJsonName() + { + return jsonName; + } +} diff --git a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java index 3440b545e26..d4ac2c454a0 100644 --- a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java @@ -28,6 +28,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.java.util.http.client.HttpClient; @@ -64,7 +65,7 @@ public class CoordinatorDiscoveryModule implements Module return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, "/druid/coordinator/v1/leader", serverDiscoverySelector ); diff --git a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java index 4348b5b8027..05b76ed2294 100644 --- a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java @@ -28,6 +28,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.java.util.http.client.HttpClient; @@ -64,7 +65,7 @@ public class IndexingServiceDiscoveryModule implements Module return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + NodeType.OVERLORD, "/druid/indexer/v1/leader", serverDiscoverySelector ); diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java index 9e5a46e642c..240cee64904 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java @@ -26,7 +26,8 @@ import org.apache.druid.java.util.common.StringUtils; /** * This enum represents types of druid services that hold segments. *

- * These types are externally visible (e.g., from the output of /druid/coordinator/v1/servers). + * These types are externally visible (e.g., from the output of {@link + * org.apache.druid.server.http.ServersResource#makeSimpleServer}). *

* For backwards compatibility, when presenting these types externally, the toString() representation * of the enum should be used. @@ -34,6 +35,11 @@ import org.apache.druid.java.util.common.StringUtils; * The toString() method converts the enum name() to lowercase and replaces underscores with hyphens, * which is the format expected for the server type string prior to the patch that introduced ServerType: * https://github.com/apache/incubator-druid/pull/4148 + * + * This is a historical occasion that this enum is different from {@link org.apache.druid.discovery.NodeType} because + * they are essentially the same abstraction, but merging them could only increase the complexity and drop the code + * safety, because they name the same types differently ("indexer-executor" - "peon" and "realtime" - "middleManager") + * and both expose them via JSON APIs. */ public enum ServerType { diff --git a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java index 5fff19e4a10..0bc871386c1 100644 --- a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java +++ b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java @@ -28,8 +28,8 @@ import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.LazySingleton; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.DruidNode; import org.apache.druid.server.http.security.StateResourceFilter; @@ -40,6 +40,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.util.Arrays; import java.util.Collection; /** @@ -59,33 +60,23 @@ public class ClusterResource @GET @Produces(MediaType.APPLICATION_JSON) - public Response getClusterServers( - @QueryParam("full") boolean full - ) + public Response getClusterServers(@QueryParam("full") boolean full) { - ImmutableMap.Builder entityBuilder = new ImmutableMap.Builder<>(); + ImmutableMap.Builder entityBuilder = new ImmutableMap.Builder<>(); - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, - getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, full) - ); - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, - getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, full) - ); - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, - getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, full) - ); - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, - getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, full) - ); + entityBuilder.put(NodeType.COORDINATOR, getNodes(NodeType.COORDINATOR, full)); + entityBuilder.put(NodeType.OVERLORD, getNodes(NodeType.OVERLORD, full)); + entityBuilder.put(NodeType.BROKER, getNodes(NodeType.BROKER, full)); + entityBuilder.put(NodeType.HISTORICAL, getNodes(NodeType.HISTORICAL, full)); - Collection mmNodes = getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_MM, full); + Collection mmNodes = getNodes(NodeType.MIDDLE_MANAGER, full); if (!mmNodes.isEmpty()) { - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_MM, mmNodes); + entityBuilder.put(NodeType.MIDDLE_MANAGER, mmNodes); } - Collection routerNodes = getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, full); + Collection routerNodes = getNodes(NodeType.ROUTER, full); if (!routerNodes.isEmpty()) { - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, routerNodes); + entityBuilder.put(NodeType.ROUTER, routerNodes); } return Response.status(Response.Status.OK).entity(entityBuilder.build()).build(); @@ -94,28 +85,19 @@ public class ClusterResource @GET @Produces({MediaType.APPLICATION_JSON}) @Path("/{nodeType}") - public Response getClusterServers( - @PathParam("nodeType") String nodeType, - @QueryParam("full") boolean full - ) + public Response getClusterServers(@PathParam("nodeType") NodeType nodeType, @QueryParam("full") boolean full) { - if (nodeType == null || !DruidNodeDiscoveryProvider.ALL_NODE_TYPES.contains(nodeType)) { + if (nodeType == null) { return Response.serverError() .status(Response.Status.BAD_REQUEST) - .entity(StringUtils.format( - "Invalid nodeType [%s]. Valid node types are %s .", - nodeType, - DruidNodeDiscoveryProvider.ALL_NODE_TYPES - )) + .entity("Invalid nodeType of null. Valid node types are " + Arrays.toString(NodeType.values())) .build(); } else { - return Response.status(Response.Status.OK).entity( - getNodes(nodeType, full) - ).build(); + return Response.status(Response.Status.OK).entity(getNodes(nodeType, full)).build(); } } - private Collection getNodes(String nodeType, boolean full) + private Collection getNodes(NodeType nodeType, boolean full) { Collection discoveryDruidNodes = druidNodeDiscoveryProvider.getForNodeType(nodeType) .getAllNodes(); diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index 4b224c89ee7..fcc3303e7c1 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -29,6 +29,7 @@ import org.apache.druid.client.selector.Server; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -125,7 +126,7 @@ public class TieredBrokerHostSelector servers.put(entry.getValue(), new NodesHolder()); } - DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER); + DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER); druidNodeDiscovery.registerListener( new DruidNodeDiscovery.Listener() { diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java index 96bc3893b01..80681576775 100644 --- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java @@ -29,6 +29,7 @@ import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.http.client.HttpClient; @@ -62,7 +63,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; - /** */ public class HttpServerInventoryViewTest @@ -168,7 +168,7 @@ public class HttpServerInventoryViewTest DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) ) diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java index a8be9337b6f..daac842dd57 100644 --- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java +++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java @@ -28,7 +28,7 @@ import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.curator.announcement.Announcer; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; @@ -81,25 +81,25 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase DiscoveryDruidNode node1 = new DiscoveryDruidNode( new DruidNode("s1", "h1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, ImmutableMap.of() ); DiscoveryDruidNode node2 = new DiscoveryDruidNode( new DruidNode("s2", "h2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, ImmutableMap.of() ); DiscoveryDruidNode node3 = new DiscoveryDruidNode( new DruidNode("s3", "h3", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + NodeType.OVERLORD, ImmutableMap.of() ); DiscoveryDruidNode node4 = new DiscoveryDruidNode( new DruidNode("s4", "h4", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + NodeType.OVERLORD, ImmutableMap.of() ); @@ -113,8 +113,8 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase ); druidNodeDiscoveryProvider.start(); - DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR); - DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD); + DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.COORDINATOR); + DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.OVERLORD); while (!checkNodes(ImmutableSet.of(node1), coordDiscovery.getAllNodes())) { Thread.sleep(100); diff --git a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java index 60bd5662e6d..b9f00c07b08 100644 --- a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java @@ -64,6 +64,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; import java.net.URI; +import java.nio.charset.StandardCharsets; /** */ @@ -79,7 +80,7 @@ public class DruidLeaderClientTest extends BaseJettyTest protected Injector setupInjector() { final DruidNode node = new DruidNode("test", "localhost", null, null, true, false); - discoveryDruidNode = new DiscoveryDruidNode(node, "test", ImmutableMap.of()); + discoveryDruidNode = new DiscoveryDruidNode(node, NodeType.PEON, ImmutableMap.of()); Injector injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), ImmutableList.of( @@ -114,21 +115,21 @@ public class DruidLeaderClientTest extends BaseJettyTest ); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct"); - request.setContent("hello".getBytes("UTF-8")); + request.setContent("hello".getBytes(StandardCharsets.UTF_8)); Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } @@ -139,14 +140,14 @@ public class DruidLeaderClientTest extends BaseJettyTest EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of()); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); @@ -166,21 +167,21 @@ public class DruidLeaderClientTest extends BaseJettyTest ); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); - request.setContent("hello".getBytes("UTF-8")); + request.setContent("hello".getBytes(StandardCharsets.UTF_8)); Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } @@ -191,33 +192,30 @@ public class DruidLeaderClientTest extends BaseJettyTest EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes(); DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( - ImmutableList.of(new DiscoveryDruidNode( - new DruidNode("test", "dummyhost", 64231, null, true, false), - "test", - ImmutableMap.of() - )) - ); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( - ImmutableList.of(discoveryDruidNode) + DiscoveryDruidNode dummyNode = new DiscoveryDruidNode( + new DruidNode("test", "dummyhost", 64231, null, true, false), + NodeType.PEON, + ImmutableMap.of() ); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(dummyNode)); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode)); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery).anyTimes(); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery).anyTimes(); EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", serverDiscoverySelector ); druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); - request.setContent("hello".getBytes("UTF-8")); + request.setContent("hello".getBytes(StandardCharsets.UTF_8)); Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } @@ -230,14 +228,14 @@ public class DruidLeaderClientTest extends BaseJettyTest ); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); diff --git a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java index abce2b802ae..5c13dacc1ef 100644 --- a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java @@ -87,7 +87,7 @@ public class DruidNodeDiscoveryProviderTest DiscoveryDruidNode node1 = new DiscoveryDruidNode( new DruidNode("s1", "h1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0), LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) @@ -95,21 +95,21 @@ public class DruidNodeDiscoveryProviderTest DiscoveryDruidNode node2 = new DiscoveryDruidNode( new DruidNode("s2", "h2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) ); DiscoveryDruidNode node3 = new DiscoveryDruidNode( new DruidNode("s3", "h3", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) ); DiscoveryDruidNode node4 = new DiscoveryDruidNode( new DruidNode("s4", "h4", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0), LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) @@ -117,35 +117,35 @@ public class DruidNodeDiscoveryProviderTest DiscoveryDruidNode node5 = new DiscoveryDruidNode( new DruidNode("s5", "h5", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) ); DiscoveryDruidNode node6 = new DiscoveryDruidNode( new DruidNode("s6", "h6", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) ); DiscoveryDruidNode node7 = new DiscoveryDruidNode( new DruidNode("s7", "h7", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) ); DiscoveryDruidNode node7Clone = new DiscoveryDruidNode( new DruidNode("s7", "h7", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) ); DiscoveryDruidNode node8 = new DiscoveryDruidNode( new DruidNode("s8", "h8", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, ImmutableMap.of() ); @@ -183,7 +183,7 @@ public class DruidNodeDiscoveryProviderTest private List listeners = new ArrayList<>(); @Override - public DruidNodeDiscovery getForNodeType(String nodeType) + public DruidNodeDiscovery getForNodeType(NodeType nodeType) { return new DruidNodeDiscovery() { diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java index d69e676601c..14479275635 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java @@ -26,6 +26,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.server.DruidNode; import org.apache.druid.server.http.HostAndPortWithScheme; import org.easymock.EasyMock; @@ -53,21 +54,21 @@ public class LookupNodeDiscoveryTest DiscoveryDruidNode node1 = new DiscoveryDruidNode( new DruidNode("s1", "h1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1")) ); DiscoveryDruidNode node2 = new DiscoveryDruidNode( new DruidNode("s2", "h2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1")) ); DiscoveryDruidNode node3 = new DiscoveryDruidNode( new DruidNode("s3", "h3", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier2")) ); diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java index aa3d2f8c8cf..8d2a3947ac7 100644 --- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.selector.Server; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -74,19 +75,19 @@ public class TieredBrokerHostSelectorTest node1 = new DiscoveryDruidNode( new DruidNode("hotBroker", "hotHost", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of() ); node2 = new DiscoveryDruidNode( new DruidNode("coldBroker", "coldHost1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of() ); node3 = new DiscoveryDruidNode( new DruidNode("coldBroker", "coldHost2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of() ); @@ -105,7 +106,7 @@ public class TieredBrokerHostSelectorTest } }; - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index a1fa6cda85f..009a6fedbc5 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -34,8 +34,8 @@ import org.apache.druid.client.cache.CacheMonitor; import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig; import org.apache.druid.client.selector.ServerSelectorStrategy; import org.apache.druid.client.selector.TierSelectorStrategy; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.CacheModule; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.Jerseys; @@ -125,12 +125,10 @@ public class CliBroker extends ServerRunnable LifecycleModule.register(binder, Server.class); - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, - ImmutableList.of(LookupNodeService.class) - ) - ).in(LazySingleton.class); + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .toProvider(new DiscoverySideEffectsProvider(NodeType.BROKER, ImmutableList.of(LookupNodeService.class))) + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); }, new LookupModule(), diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 90268426127..5e4cd062e7f 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -36,7 +36,7 @@ import org.apache.druid.client.HttpServerInventoryViewResource; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.HttpIndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.ConditionalMultibind; import org.apache.druid.guice.ConfigProvider; import org.apache.druid.guice.Jerseys; @@ -217,12 +217,11 @@ public class CliCoordinator extends ServerRunnable DruidCoordinatorCleanupPendingSegments.class ); - binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(Coordinator.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, - ImmutableList.of() - ) - ).in(LazySingleton.class); + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .annotatedWith(Coordinator.class) + .toProvider(new DiscoverySideEffectsProvider(NodeType.COORDINATOR, ImmutableList.of())) + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class)); } diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index fafeadb6ace..7d3c7e8767b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -27,8 +27,8 @@ import io.airlift.airline.Command; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CacheMonitor; import org.apache.druid.discovery.DataNodeService; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.CacheModule; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.Jerseys; @@ -103,12 +103,15 @@ public class CliHistorical extends ServerRunnable binder.install(new CacheModule()); MetricsModule.register(binder, CacheMonitor.class); - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, - ImmutableList.of(DataNodeService.class, LookupNodeService.class) + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .toProvider( + new DiscoverySideEffectsProvider( + NodeType.HISTORICAL, + ImmutableList.of(DataNodeService.class, LookupNodeService.class) + ) ) - ).in(LazySingleton.class); + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); }, new LookupModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index e2c31af7200..8fa80e57ff9 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -30,7 +30,7 @@ import com.google.inject.name.Names; import com.google.inject.util.Providers; import io.airlift.airline.Command; import org.apache.druid.client.indexing.IndexingServiceClient; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.IndexingServiceFirehoseModule; import org.apache.druid.guice.IndexingServiceModuleHelper; @@ -129,12 +129,12 @@ public class CliMiddleManager extends ServerRunnable LifecycleModule.register(binder, Server.class); - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_MM, - ImmutableList.of(WorkerNodeService.class) + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .toProvider( + new DiscoverySideEffectsProvider(NodeType.MIDDLE_MANAGER, ImmutableList.of(WorkerNodeService.class)) ) - ).in(LazySingleton.class); + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 000ef53de9a..45299a83d77 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -38,7 +38,7 @@ import org.apache.druid.client.indexing.HttpIndexingServiceClient; import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceSelectorConfig; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.IndexingServiceFirehoseModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; @@ -234,12 +234,11 @@ public class CliOverlord extends ServerRunnable LifecycleModule.register(binder, Server.class); } - binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(IndexingService.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, - ImmutableList.of() - ) - ).in(LazySingleton.class); + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .annotatedWith(IndexingService.class) + .toProvider(new DiscoverySideEffectsProvider(NodeType.OVERLORD, ImmutableList.of())) + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, IndexingService.class)); } diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java index 88a77a34662..8c0d83e4d93 100644 --- a/services/src/main/java/org/apache/druid/cli/CliRouter.java +++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java @@ -32,6 +32,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -120,12 +121,10 @@ public class CliRouter extends ServerRunnable LifecycleModule.register(binder, Server.class); DiscoveryModule.register(binder, Self.class); - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, - ImmutableList.of() - ) - ).in(LazySingleton.class); + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .toProvider(new DiscoverySideEffectsProvider(NodeType.ROUTER, ImmutableList.of())) + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } @@ -134,7 +133,6 @@ public class CliRouter extends ServerRunnable public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( TieredBrokerConfig config, ServerDiscoveryFactory factory - ) { return factory.createSelector(config.getCoordinatorServiceName()); @@ -151,7 +149,7 @@ public class CliRouter extends ServerRunnable return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, "/druid/coordinator/v1/leader", serverDiscoverySelector ); diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java index bc9c68bbacc..7f60f1f15d2 100644 --- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java +++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java @@ -27,6 +27,7 @@ import com.google.inject.Provider; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.DruidService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; @@ -58,8 +59,8 @@ public abstract class ServerRunnable extends GuiceRunnable } /** - * This is a helper class used by CliXXX classes to announce DiscoveryDruidNode - * as part of lifecycle Stage.LAST . + * This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode} + * as part of {@link Lifecycle.Stage#LAST}. */ protected static class DiscoverySideEffectsProvider implements Provider { @@ -77,10 +78,10 @@ public abstract class ServerRunnable extends GuiceRunnable @Inject private Injector injector; - private final String nodeType; + private final NodeType nodeType; private final List> serviceClasses; - public DiscoverySideEffectsProvider(String nodeType, List> serviceClasses) + public DiscoverySideEffectsProvider(NodeType nodeType, List> serviceClasses) { this.nodeType = nodeType; this.serviceClasses = serviceClasses; @@ -95,10 +96,7 @@ public abstract class ServerRunnable extends GuiceRunnable builder.put(service.getName(), service); } - DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, - nodeType, - builder.build() - ); + DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, nodeType, builder.build()); lifecycle.addHandler( new Lifecycle.Handler()