diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 4cbe042350a..950be651e86 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -116,7 +116,13 @@ public class ServerManager implements QuerySegmentWalker return segmentLoader.isSegmentLoaded(segment); } - public void loadSegment(final DataSegment segment) throws SegmentLoadingException + /** + * Load a single segment. + * @param segment segment to load + * @return true if the segment was newly loaded, false if it was already loaded + * @throws SegmentLoadingException if the segment cannot be loaded + */ + public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException { final Segment adapter; try { @@ -150,8 +156,8 @@ public class ServerManager implements QuerySegmentWalker segment.getVersion() ); if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { - log.info("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); - throw new SegmentLoadingException("Segment already exists[%s]", segment.getIdentifier()); + log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); + return false; } loadedIntervals.add( @@ -165,6 +171,7 @@ public class ServerManager implements QuerySegmentWalker synchronized (dataSourceCounts) { dataSourceCounts.add(dataSource, 1L); } + return true; } } diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index a55341a75a1..246415f57d0 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -230,34 +230,37 @@ public class ZkCoordinator implements DataSegmentChangeHandler try { log.info("Loading segment %s", segment.getIdentifier()); + final boolean loaded; try { - serverManager.loadSegment(segment); + loaded = serverManager.loadSegment(segment); } catch (Exception e) { removeSegment(segment); throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); } - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + removeSegment(segment); + throw new SegmentLoadingException( + e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile + ); + } + } + try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); + announcer.announceSegment(segment); } catch (IOException e) { - removeSegment(segment); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } } - try { - announcer.announceSegment(segment); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); - } - } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segment for dataSource") @@ -275,8 +278,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler for (DataSegment segment : segments) { log.info("Loading segment %s", segment.getIdentifier()); + final boolean loaded; try { - serverManager.loadSegment(segment); + loaded = serverManager.loadSegment(segment); } catch (Exception e) { log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); @@ -285,20 +289,22 @@ public class ZkCoordinator implements DataSegmentChangeHandler continue; } - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); + if (loaded) { + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + continue; + } } - catch (IOException e) { - log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); - removeSegment(segment); - segmentFailures.add(segment.getIdentifier()); - continue; - } - } - validSegments.add(segment); + validSegments.add(segment); + } } try { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 0ae18185015..71a4d0eb08c 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -46,10 +46,13 @@ import io.druid.client.ServerInventoryView; import io.druid.client.indexing.IndexingServiceClient; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; +import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseSegmentManager; import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Self; import io.druid.segment.IndexIO; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -99,6 +102,8 @@ public class DruidCoordinator private final LoadQueueTaskMaster taskMaster; private final Map loadManagementPeons; private final AtomicReference leaderLatch; + private final ServiceAnnouncer serviceAnnouncer; + private final DruidNode self; @Inject public DruidCoordinator( @@ -112,7 +117,9 @@ public class DruidCoordinator ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, - LoadQueueTaskMaster taskMaster + LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + @Self DruidNode self ) { this( @@ -127,6 +134,8 @@ public class DruidCoordinator scheduledExecutorFactory, indexingServiceClient, taskMaster, + serviceAnnouncer, + self, Maps.newConcurrentMap() ); } @@ -143,6 +152,8 @@ public class DruidCoordinator ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, + ServiceAnnouncer serviceAnnouncer, + DruidNode self, ConcurrentMap loadQueuePeonMap ) { @@ -157,6 +168,8 @@ public class DruidCoordinator this.emitter = emitter; this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; + this.serviceAnnouncer = serviceAnnouncer; + this.self = self; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); @@ -474,6 +487,7 @@ public class DruidCoordinator databaseSegmentManager.start(); databaseRuleManager.start(); serverInventoryView.start(); + serviceAnnouncer.announce(self); final List> coordinatorRunnables = Lists.newArrayList(); dynamicConfigs = configManager.watch( @@ -554,8 +568,10 @@ public class DruidCoordinator } loadManagementPeons.clear(); - databaseSegmentManager.stop(); + serviceAnnouncer.unannounce(self); serverInventoryView.stop(); + databaseRuleManager.stop(); + databaseSegmentManager.stop(); leader = false; } catch (Exception e) { diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 8f55a93948c..58323faa863 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,8 +23,10 @@ import com.google.common.collect.MapMaker; import com.metamx.common.concurrent.ScheduledExecutorFactory; import io.druid.client.DruidServer; import io.druid.client.SingleServerInventoryView; +import io.druid.curator.discovery.NoopServiceAnnouncer; import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.db.DatabaseSegmentManager; +import io.druid.server.DruidNode; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; @@ -111,6 +113,8 @@ public class DruidCoordinatorTest scheduledExecutorFactory, null, taskMaster, + new NoopServiceAnnouncer(), + new DruidNode("hey", "what", 1234), loadManagementPeons ); } diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index c37702d9310..47240c691ee 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -28,7 +28,6 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.indexing.IndexingServiceClient; -import io.druid.curator.discovery.DiscoveryModule; import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManagerConfig; import io.druid.db.DatabaseRuleManagerProvider; @@ -41,7 +40,6 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Self; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; @@ -103,7 +101,6 @@ public class CliCoordinator extends ServerRunnable binder.bind(DruidCoordinator.class); LifecycleModule.register(binder, DruidCoordinator.class); - DiscoveryModule.register(binder, Self.class); binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer()); Jerseys.addResource(binder, BackwardsCompatibleInfoResource.class);