Coordinator: Link service announcement to leadership

This commit is contained in:
Gian Merlino 2013-12-16 11:12:35 -08:00
parent d2451fa37b
commit 7759ed4517
3 changed files with 20 additions and 4 deletions

View File

@ -46,10 +46,13 @@ import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseSegmentManager;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.segment.IndexIO;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
@ -99,6 +102,8 @@ public class DruidCoordinator
private final LoadQueueTaskMaster taskMaster;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final AtomicReference<LeaderLatch> leaderLatch;
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
@Inject
public DruidCoordinator(
@ -112,7 +117,9 @@ public class DruidCoordinator
ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory,
IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self
)
{
this(
@ -127,6 +134,8 @@ public class DruidCoordinator
scheduledExecutorFactory,
indexingServiceClient,
taskMaster,
serviceAnnouncer,
self,
Maps.<String, LoadQueuePeon>newConcurrentMap()
);
}
@ -143,6 +152,8 @@ public class DruidCoordinator
ScheduledExecutorFactory scheduledExecutorFactory,
IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap
)
{
@ -157,6 +168,8 @@ public class DruidCoordinator
this.emitter = emitter;
this.indexingServiceClient = indexingServiceClient;
this.taskMaster = taskMaster;
this.serviceAnnouncer = serviceAnnouncer;
this.self = self;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
@ -474,6 +487,7 @@ public class DruidCoordinator
databaseSegmentManager.start();
databaseRuleManager.start();
serverInventoryView.start();
serviceAnnouncer.announce(self);
final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList();
dynamicConfigs = configManager.watch(
@ -554,6 +568,7 @@ public class DruidCoordinator
}
loadManagementPeons.clear();
serviceAnnouncer.unannounce(self);
serverInventoryView.stop();
databaseRuleManager.stop();
databaseSegmentManager.stop();

View File

@ -23,8 +23,10 @@ import com.google.common.collect.MapMaker;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.DruidServer;
import io.druid.client.SingleServerInventoryView;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.db.DatabaseSegmentManager;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
@ -111,6 +113,8 @@ public class DruidCoordinatorTest
scheduledExecutorFactory,
null,
taskMaster,
new NoopServiceAnnouncer(),
new DruidNode("hey", "what", 1234),
loadManagementPeons
);
}

View File

@ -28,7 +28,6 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseRuleManagerConfig;
import io.druid.db.DatabaseRuleManagerProvider;
@ -41,7 +40,6 @@ import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.coordinator.LoadQueueTaskMaster;
@ -106,7 +104,6 @@ public class CliCoordinator extends ServerRunnable
binder.bind(DruidCoordinator.class);
LifecycleModule.register(binder, DruidCoordinator.class);
DiscoveryModule.register(binder, Self.class);
binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer());
Jerseys.addResource(binder, BackwardsCompatiableInfoResource.class);