From d2451fa37bf1f62367f0c805baaa2f2e4480574c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 16 Dec 2013 11:12:20 -0800 Subject: [PATCH 1/2] Coordinator: Stop databaseRuleManager in stopBeingLeader --- .../java/io/druid/server/coordinator/DruidCoordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 0ae18185015..83eac0b9646 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -554,8 +554,9 @@ public class DruidCoordinator } loadManagementPeons.clear(); - databaseSegmentManager.stop(); serverInventoryView.stop(); + databaseRuleManager.stop(); + databaseSegmentManager.stop(); leader = false; } catch (Exception e) { From 7759ed45178eefd7d9e827f20f7d1582377b4a5b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 16 Dec 2013 11:12:35 -0800 Subject: [PATCH 2/2] Coordinator: Link service announcement to leadership --- .../server/coordinator/DruidCoordinator.java | 17 ++++++++++++++++- .../coordinator/DruidCoordinatorTest.java | 4 ++++ .../main/java/io/druid/cli/CliCoordinator.java | 3 --- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 83eac0b9646..71a4d0eb08c 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -46,10 +46,13 @@ import io.druid.client.ServerInventoryView; import io.druid.client.indexing.IndexingServiceClient; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseSegmentManager; import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; import io.druid.segment.IndexIO; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -99,6 +102,8 @@ public class DruidCoordinator private final LoadQueueTaskMaster taskMaster; private final Map loadManagementPeons; private final AtomicReference leaderLatch; + private final ServiceAnnouncer serviceAnnouncer; + private final DruidNode self; @Inject public DruidCoordinator( @@ -112,7 +117,9 @@ public class DruidCoordinator ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, - LoadQueueTaskMaster taskMaster + LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + @Self DruidNode self ) { this( @@ -127,6 +134,8 @@ public class DruidCoordinator scheduledExecutorFactory, indexingServiceClient, taskMaster, + serviceAnnouncer, + self, Maps.newConcurrentMap() ); } @@ -143,6 +152,8 @@ public class DruidCoordinator ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + DruidNode self, ConcurrentMap loadQueuePeonMap ) { @@ -157,6 +168,8 @@ public class DruidCoordinator this.emitter = emitter; this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; + this.serviceAnnouncer = serviceAnnouncer; + this.self = self; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); @@ -474,6 +487,7 @@ public class DruidCoordinator databaseSegmentManager.start(); databaseRuleManager.start(); serverInventoryView.start(); + serviceAnnouncer.announce(self); final List> coordinatorRunnables = Lists.newArrayList(); dynamicConfigs = configManager.watch( @@ -554,6 +568,7 @@ public class DruidCoordinator } loadManagementPeons.clear(); + serviceAnnouncer.unannounce(self); serverInventoryView.stop(); databaseRuleManager.stop(); databaseSegmentManager.stop(); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 8f55a93948c..58323faa863 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,8 +23,10 @@ import com.google.common.collect.MapMaker; import com.metamx.common.concurrent.ScheduledExecutorFactory; import io.druid.client.DruidServer; import io.druid.client.SingleServerInventoryView; +import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.db.DatabaseSegmentManager; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; @@ -111,6 +113,8 @@ public class DruidCoordinatorTest scheduledExecutorFactory, null, taskMaster, + new NoopServiceAnnouncer(), + new DruidNode("hey", "what", 1234), loadManagementPeons ); } diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 13295bd3dee..1f7fc14b960 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -28,7 +28,6 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.indexing.IndexingServiceClient; -import io.druid.curator.discovery.DiscoveryModule; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManagerConfig; import io.druid.db.DatabaseRuleManagerProvider; @@ -41,7 +40,6 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Self; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; @@ -106,7 +104,6 @@ public class CliCoordinator extends ServerRunnable binder.bind(DruidCoordinator.class); LifecycleModule.register(binder, DruidCoordinator.class); - DiscoveryModule.register(binder, Self.class); binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer()); Jerseys.addResource(binder, BackwardsCompatiableInfoResource.class);