diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 0ae18185015..71a4d0eb08c 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -46,10 +46,13 @@ import io.druid.client.ServerInventoryView; import io.druid.client.indexing.IndexingServiceClient; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseSegmentManager; import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; import io.druid.segment.IndexIO; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -99,6 +102,8 @@ public class DruidCoordinator private final LoadQueueTaskMaster taskMaster; private final Map loadManagementPeons; private final AtomicReference leaderLatch; + private final ServiceAnnouncer serviceAnnouncer; + private final DruidNode self; @Inject public DruidCoordinator( @@ -112,7 +117,9 @@ public class DruidCoordinator ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, - LoadQueueTaskMaster taskMaster + LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + @Self DruidNode self ) { this( @@ -127,6 +134,8 @@ public class DruidCoordinator scheduledExecutorFactory, indexingServiceClient, taskMaster, + serviceAnnouncer, + self, Maps.newConcurrentMap() ); } @@ -143,6 +152,8 @@ public class DruidCoordinator ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + DruidNode self, ConcurrentMap loadQueuePeonMap ) { @@ -157,6 +168,8 @@ public class DruidCoordinator this.emitter = emitter; this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; + this.serviceAnnouncer = serviceAnnouncer; + this.self = self; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); @@ -474,6 +487,7 @@ public class DruidCoordinator databaseSegmentManager.start(); databaseRuleManager.start(); serverInventoryView.start(); + serviceAnnouncer.announce(self); final List> coordinatorRunnables = Lists.newArrayList(); dynamicConfigs = configManager.watch( @@ -554,8 +568,10 @@ public class DruidCoordinator } loadManagementPeons.clear(); - databaseSegmentManager.stop(); + serviceAnnouncer.unannounce(self); serverInventoryView.stop(); + databaseRuleManager.stop(); + databaseSegmentManager.stop(); leader = false; } catch (Exception e) { diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 8f55a93948c..58323faa863 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,8 +23,10 @@ import com.google.common.collect.MapMaker; import com.metamx.common.concurrent.ScheduledExecutorFactory; import io.druid.client.DruidServer; import io.druid.client.SingleServerInventoryView; +import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.db.DatabaseSegmentManager; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; @@ -111,6 +113,8 @@ public class DruidCoordinatorTest scheduledExecutorFactory, null, taskMaster, + new NoopServiceAnnouncer(), + new DruidNode("hey", "what", 1234), loadManagementPeons ); } diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 13295bd3dee..1f7fc14b960 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -28,7 +28,6 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.indexing.IndexingServiceClient; -import io.druid.curator.discovery.DiscoveryModule; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManagerConfig; import io.druid.db.DatabaseRuleManagerProvider; @@ -41,7 +40,6 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Self; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; @@ -106,7 +104,6 @@ public class CliCoordinator extends ServerRunnable binder.bind(DruidCoordinator.class); LifecycleModule.register(binder, DruidCoordinator.class); - DiscoveryModule.register(binder, Self.class); binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer()); Jerseys.addResource(binder, BackwardsCompatiableInfoResource.class);