diff --git a/docs/content/design/coordinator.md b/docs/content/design/coordinator.md index 810f212e604..cf3c0f87a77 100644 --- a/docs/content/design/coordinator.md +++ b/docs/content/design/coordinator.md @@ -52,8 +52,7 @@ Segments can be automatically loaded and dropped from the cluster based on a set ### Cleaning Up Segments -Each run, the Druid Coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped. -Note that if all segments in database are deleted(or marked unused), then Coordinator will not drop anything from the Historicals. This is done to prevent a race condition in which the Coordinator would drop all segments if it started running cleanup before it finished polling the database for available segments for the first time and believed that there were no segments. +Each run, the Druid coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped. ### Segment Availability diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index 436ad125bf7..a5584a9bc3f 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -58,6 +58,13 @@ public interface MetadataSegmentManager @Nullable ImmutableDruidDataSource getDataSource(String dataSourceName); + /** + * Returns a collection of known datasources. + * + * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has + * not yet been polled.) + */ + @Nullable Collection getDataSources(); /** @@ -65,7 +72,11 @@ public interface MetadataSegmentManager * unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than * several times. + * + * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has + * not yet been polled.) */ + @Nullable Iterable iterateAllSegments(); Collection getAllDataSourceNames(); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index dff9c9df61c..0bcf9face18 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -66,6 +66,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -73,6 +74,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** + * */ @ManageLifecycle public class SQLMetadataSegmentManager implements MetadataSegmentManager @@ -102,9 +104,14 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final Supplier dbTables; private final SQLMetadataConnector connector; - private ConcurrentHashMap dataSources = new ConcurrentHashMap<>(); + // Volatile since this reference is reassigned in "poll" and then read from in other threads. + // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map) + @Nullable + private volatile ConcurrentHashMap dataSources = null; - /** The number of times this SQLMetadataSegmentManager was started. */ + /** + * The number of times this SQLMetadataSegmentManager was started. + */ private long startCount = 0; /** * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if @@ -200,7 +207,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager return; } - dataSources = new ConcurrentHashMap<>(); + dataSources = null; currentStartOrder = -1; exec.shutdownNow(); exec = null; @@ -325,7 +332,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager ).bind("dataSource", dataSource).execute() ); - dataSources.remove(dataSource); + Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource)); if (removed == 0) { return false; @@ -348,18 +355,21 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager // Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a // potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast. List possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId); - dataSources.computeIfPresent( - dataSourceName, - (dsName, dataSource) -> { - for (SegmentId possibleSegmentId : possibleSegmentIds) { - if (dataSource.removeSegment(possibleSegmentId) != null) { - break; - } - } - // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. - //noinspection ReturnOfNull - return dataSource.isEmpty() ? null : dataSource; - } + Optional.ofNullable(dataSources).ifPresent( + m -> + m.computeIfPresent( + dataSourceName, + (dsName, dataSource) -> { + for (SegmentId possibleSegmentId : possibleSegmentIds) { + if (dataSource.removeSegment(possibleSegmentId) != null) { + break; + } + } + // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. + //noinspection ReturnOfNull + return dataSource.isEmpty() ? null : dataSource; + } + ) ); return removed; @@ -375,14 +385,17 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager { try { final boolean removed = removeSegmentFromTable(segmentId.toString()); - dataSources.computeIfPresent( - segmentId.getDataSource(), - (dsName, dataSource) -> { - dataSource.removeSegment(segmentId); - // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. - //noinspection ReturnOfNull - return dataSource.isEmpty() ? null : dataSource; - } + Optional.ofNullable(dataSources).ifPresent( + m -> + m.computeIfPresent( + segmentId.getDataSource(), + (dsName, dataSource) -> { + dataSource.removeSegment(segmentId); + // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. + //noinspection ReturnOfNull + return dataSource.isEmpty() ? null : dataSource; + } + ) ); return removed; } @@ -422,23 +435,37 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @Nullable public ImmutableDruidDataSource getDataSource(String dataSourceName) { - final DruidDataSource dataSource = dataSources.get(dataSourceName); + final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null); return dataSource == null ? null : dataSource.toImmutableDruidDataSource(); } @Override + @Nullable public Collection getDataSources() { - return dataSources.values() - .stream() - .map(DruidDataSource::toImmutableDruidDataSource) - .collect(Collectors.toList()); + return Optional.ofNullable(dataSources) + .map(m -> + m.values() + .stream() + .map(DruidDataSource::toImmutableDruidDataSource) + .collect(Collectors.toList()) + ) + .orElse(null); } @Override + @Nullable public Iterable iterateAllSegments() { - return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator(); + final ConcurrentHashMap dataSourcesSnapshot = dataSources; + if (dataSourcesSnapshot == null) { + return null; + } + + return () -> dataSourcesSnapshot.values() + .stream() + .flatMap(dataSource -> dataSource.getSegments().stream()) + .iterator(); } @Override @@ -543,6 +570,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager .addSegmentIfAbsent(segment); }); + // Replace "dataSources" atomically. dataSources = newDataSources; } @@ -557,7 +585,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager */ private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) { - DruidDataSource dataSource = dataSources.get(segment.getDataSource()); + DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null); if (dataSource == null) { return segment; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 3b5faef7a04..27bee2f0d22 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -74,8 +74,10 @@ import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTime; import org.joda.time.Duration; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -88,6 +90,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; /** + * */ @ManageLifecycle public class DruidCoordinator @@ -242,7 +245,9 @@ public class DruidCoordinator return loadManagementPeons; } - /** @return tier -> { dataSource -> underReplicationCount } map */ + /** + * @return tier -> { dataSource -> underReplicationCount } map + */ public Map> computeUnderReplicationCountsPerDataSourcePerTier() { final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); @@ -251,9 +256,15 @@ public class DruidCoordinator return underReplicationCountsPerDataSourcePerTier; } + final Iterable dataSegments = iterateAvailableDataSegments(); + + if (dataSegments == null) { + return underReplicationCountsPerDataSourcePerTier; + } + final DateTime now = DateTimes.nowUtc(); - for (final DataSegment segment : iterateAvailableDataSegments()) { + for (final DataSegment segment : dataSegments) { final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); for (final Rule rule : rules) { @@ -285,7 +296,13 @@ public class DruidCoordinator return retVal; } - for (DataSegment segment : iterateAvailableDataSegments()) { + final Iterable dataSegments = iterateAvailableDataSegments(); + + if (dataSegments == null) { + return retVal; + } + + for (DataSegment segment : dataSegments) { if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) { retVal.addTo(segment.getDataSource(), 1); } else { @@ -298,8 +315,14 @@ public class DruidCoordinator public Map getLoadStatus() { - Map loadStatus = new HashMap<>(); - for (ImmutableDruidDataSource dataSource : metadataSegmentManager.getDataSources()) { + final Map loadStatus = new HashMap<>(); + final Collection dataSources = metadataSegmentManager.getDataSources(); + + if (dataSources == null) { + return loadStatus; + } + + for (ImmutableDruidDataSource dataSource : dataSources) { final Set segments = Sets.newHashSet(dataSource.getSegments()); final int availableSegmentSize = segments.size(); @@ -453,7 +476,11 @@ public class DruidCoordinator * is unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than * several times. + * + * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has + * not yet been polled.) */ + @Nullable public Iterable iterateAvailableDataSegments() { return metadataSegmentManager.iterateAllSegments(); @@ -643,10 +670,16 @@ public class DruidCoordinator BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); // Do coordinator stuff. + final Collection dataSources = metadataSegmentManager.getDataSources(); + if (dataSources == null) { + log.info("Metadata store not polled yet, skipping this run."); + return; + } + DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() .withStartTime(startTime) - .withDataSources(metadataSegmentManager.getDataSources()) + .withDataSources(dataSources) .withDynamicConfigs(getDynamicConfigs()) .withCompactionConfig(getCompactionConfig()) .withEmitter(emitter) @@ -656,6 +689,11 @@ public class DruidCoordinator // Don't read state and run state in the same helper otherwise racy conditions may exist if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { params = helper.run(params); + + if (params == null) { + // This helper wanted to cancel the run. No log message, since the helper should have logged a reason. + return; + } } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java index a7a1bcce220..2e77577b69b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.SortedSet; /** + * */ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper { @@ -45,21 +46,12 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper Set availableSegments = params.getAvailableSegments(); DruidCluster cluster = params.getDruidCluster(); - if (availableSegments.isEmpty()) { - log.info( - "Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent " + - "a race condition in which the coordinator would drop all segments if it started running cleanup before " + - "it finished polling the metadata storage for available segments for the first time." - ); - return params.buildFromExisting().withCoordinatorStats(stats).build(); - } - - // Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It might - // not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any - // segments at all, we should have all of them.) - // Note that if metadata store has no segments, then availableSegments will stay empty and nothing will be dropped. - // This is done to prevent a race condition in which the coordinator would drop all segments if it started running - // cleanup before it finished polling the metadata storage for available segments for the first time. + // Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It's + // also filled atomically, so if there are any segments at all, we should have all of them.) + // + // Note that if the metadata store has not been polled yet, "getAvailableSegments" would throw an error since + // "availableSegments" is null. But this won't happen, since the earlier helper "DruidCoordinatorSegmentInfoLoader" + // would have canceled the run. for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java index a2752c35270..78ee92bde32 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java @@ -21,7 +21,10 @@ package org.apache.druid.server.coordinator.helper; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import javax.annotation.Nullable; + /** + * */ public interface DruidCoordinatorHelper { @@ -29,8 +32,13 @@ public interface DruidCoordinatorHelper * Implementations of this method run various activities performed by the coordinator. * Input params can be used and modified. They are typically in a list and returned * DruidCoordinatorRuntimeParams is passed to the next helper. + * * @param params - * @return same as input or a modified value to be used by next helper. + * + * @return same as input or a modified value to be used by next helper. Null return + * values will prevent future DruidCoordinatorHelpers from running until the next + * cycle. */ + @Nullable DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java index 801bd3ba502..2353247e3f3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java @@ -43,6 +43,12 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper { log.info("Starting coordination. Getting available segments."); + final Iterable dataSegments = coordinator.iterateAvailableDataSegments(); + if (dataSegments == null) { + log.info("Metadata store not polled yet, canceling this run."); + return null; + } + // The following transform() call doesn't actually transform the iterable. It only checks the sizes of the segments // and emits alerts if segments with negative sizes are encountered. In other words, semantically it's similar to // Stream.peek(). It works as long as DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() (which is called @@ -54,7 +60,7 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper // //noinspection StaticPseudoFunctionalStyleMethod: https://youtrack.jetbrains.com/issue/IDEA-153047 Iterable availableSegmentsWithSizeChecking = Iterables.transform( - coordinator.iterateAvailableDataSegments(), + dataSegments, segment -> { if (segment.getSize() < 0) { log.makeAlert("No size on a segment") diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index c7e270214ff..af106fb863f 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -55,6 +55,7 @@ import javax.ws.rs.core.StreamingOutput; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -96,7 +97,10 @@ public class MetadataResource @Context final HttpServletRequest req ) { - final Collection druidDataSources = metadataSegmentManager.getDataSources(); + // If we haven't polled the metadata store yet, use an empty list of datasources. + final Collection druidDataSources = Optional.ofNullable(metadataSegmentManager.getDataSources()) + .orElse(Collections.emptyList()); + final Set dataSourceNamesPreAuth; if (includeDisabled != null) { dataSourceNamesPreAuth = new TreeSet<>(metadataSegmentManager.getAllDataSourceNames()); @@ -154,7 +158,10 @@ public class MetadataResource @QueryParam("datasources") final Set datasources ) { - Collection druidDataSources = metadataSegmentManager.getDataSources(); + // If we haven't polled the metadata store yet, use an empty list of datasources. + Collection druidDataSources = Optional.ofNullable(metadataSegmentManager.getDataSources()) + .orElse(Collections.emptyList()); + if (datasources != null && !datasources.isEmpty()) { druidDataSources = druidDataSources.stream() .filter(src -> datasources.contains(src.getName())) diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java index a9f8f3c5df0..242dc5ecb27 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java @@ -39,6 +39,7 @@ import org.junit.Rule; import org.junit.Test; import java.io.IOException; +import java.util.stream.Collectors; public class SQLMetadataSegmentManagerTest @@ -123,10 +124,48 @@ public class SQLMetadataSegmentManagerTest ImmutableList.of("wikipedia"), manager.getAllDataSourceNames() ); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + manager.getDataSources().stream().map(d -> d.getName()).collect(Collectors.toList()) + ); Assert.assertEquals( ImmutableSet.of(segment1, segment2), ImmutableSet.copyOf(manager.getDataSource("wikipedia").getSegments()) ); + Assert.assertEquals( + ImmutableSet.of(segment1, segment2), + ImmutableSet.copyOf(manager.iterateAllSegments()) + ); + } + + @Test + public void testNoPoll() + { + manager.start(); + Assert.assertTrue(manager.isStarted()); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + manager.getAllDataSourceNames() + ); + Assert.assertNull(manager.getDataSources()); + Assert.assertNull(manager.getDataSource("wikipedia")); + Assert.assertNull(manager.iterateAllSegments()); + } + + @Test + public void testPollThenStop() + { + manager.start(); + manager.poll(); + manager.stop(); + Assert.assertFalse(manager.isStarted()); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + manager.getAllDataSourceNames() + ); + Assert.assertNull(manager.getDataSources()); + Assert.assertNull(manager.getDataSource("wikipedia")); + Assert.assertNull(manager.iterateAllSegments()); } @Test