diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index dbc04b06bd5..f8ea8ef24f3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -338,13 +338,6 @@ public class DruidCoordinator return CoordinatorCompactionConfig.current(configManager); } - public void markSegmentsAsUnused(String datasource, Set segmentIds) - { - log.debug("Marking [%d] segments of datasource [%s] as unused.", segmentIds.size(), datasource); - int updatedCount = segmentsMetadataManager.markSegmentsAsUnused(segmentIds); - log.info("Successfully marked [%d] segments of datasource [%s] as unused.", updatedCount, datasource); - } - public String getCurrentLeader() { return coordLeaderSelector.getCurrentLeader(); @@ -579,10 +572,10 @@ public class DruidCoordinator { return ImmutableList.of( new UpdateCoordinatorStateAndPrepareCluster(), - new RunRules(), + new RunRules(segmentsMetadataManager::markSegmentsAsUnused), new UpdateReplicationStatus(), new UnloadUnusedSegments(loadQueueManager), - new MarkOvershadowedSegmentsAsUnused(DruidCoordinator.this), + new MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused), new BalanceSegments(), new CollectSegmentAndServerStats(DruidCoordinator.this) ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java index 5109ec3c3f2..1c0ff573d19 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java @@ -27,7 +27,6 @@ import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.loading.LoadQueuePeon; -import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; @@ -60,9 +59,6 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty .forEach(this::logHistoricalTierStats); collectSegmentStats(params); - StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner(); - segmentAssigner.makeAlerts(); - return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java index 3cb9f0064cf..0c1b4925336 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java @@ -24,7 +24,6 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.DruidCluster; -import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; @@ -42,15 +41,21 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +/** + * Marks segments that are overshadowed by currently served segments as unused. + * This duty runs only if the Coordinator has been running long enough to have a + * refreshed metadata view. This duration is controlled by the dynamic config + * {@code millisToWaitBeforeDeleting}. + */ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty { private static final Logger log = new Logger(MarkOvershadowedSegmentsAsUnused.class); - private final DruidCoordinator coordinator; + private final SegmentDeleteHandler deleteHandler; - public MarkOvershadowedSegmentsAsUnused(DruidCoordinator coordinator) + public MarkOvershadowedSegmentsAsUnused(SegmentDeleteHandler deleteHandler) { - this.coordinator = coordinator; + this.deleteHandler = deleteHandler; } @Override @@ -104,7 +109,9 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty (datasource, unusedSegments) -> { RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource); stats.add(Stats.Segments.OVERSHADOWED, datasourceKey, unusedSegments.size()); - coordinator.markSegmentsAsUnused(datasource, unusedSegments); + + int updatedCount = deleteHandler.markSegmentsAsUnused(unusedSegments); + log.info("Successfully marked [%d] segments of datasource[%s] as unused.", updatedCount, datasource); } ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index a5c4eb58041..60f46aca6e2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -19,9 +19,10 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.collect.Lists; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.server.coordinator.DruidCluster; @@ -29,8 +30,11 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner; import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTime; import java.util.List; @@ -38,7 +42,9 @@ import java.util.Set; import java.util.stream.Collectors; /** - * Duty to run retention rules. + * Duty to run retention rules for all used non-overshadowed segments. + * Overshadowed segments are marked unused by {@link MarkOvershadowedSegmentsAsUnused} + * duty and are eventually unloaded from all servers by {@link UnloadUnusedSegments}. *

* The params returned from {@code run()} must have these fields initialized: *