diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java index 0c1b4925336..99fcde7499c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java @@ -42,10 +42,15 @@ import java.util.Map; import java.util.Set; /** - * Marks segments that are overshadowed by currently served segments as unused. + * Marks a segment as unused if it is overshadowed by: + * + *

* This duty runs only if the Coordinator has been running long enough to have a * refreshed metadata view. This duration is controlled by the dynamic config - * {@code millisToWaitBeforeDeleting}. + * {@link org.apache.druid.server.coordinator.CoordinatorDynamicConfig#markSegmentAsUnusedDelayMillis}. */ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty { @@ -67,7 +72,7 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty final long delayMillis = params.getCoordinatorDynamicConfig().getMarkSegmentAsUnusedDelayMillis(); if (DateTimes.nowUtc().isBefore(coordinatorStartTime.plus(delayMillis))) { log.info( - "Skipping MarkAsUnused until [%s] have elapsed after coordinator start [%s].", + "Skipping MarkAsUnused until [%s] have elapsed after coordinator start time[%s].", Duration.ofMillis(delayMillis), coordinatorStartTime ); return params; @@ -79,7 +84,7 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty return params; } - DruidCluster cluster = params.getDruidCluster(); + final DruidCluster cluster = params.getDruidCluster(); final Map timelines = new HashMap<>(); cluster.getHistoricals().values().forEach( @@ -91,7 +96,14 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty broker -> addSegmentsFromServer(broker, timelines) ); - // Note that we do not include segments from ingestion services such as tasks or indexers, + // Include all segments that require zero replicas to be loaded + params.getSegmentAssigner().getSegmentsWithZeroRequiredReplicas().forEach( + (datasource, segments) -> timelines + .computeIfAbsent(datasource, ds -> new SegmentTimeline()) + .addSegments(segments.iterator()) + ); + + // Do not include segments served by ingestion services such as tasks or indexers, // to prevent unpublished segments from prematurely overshadowing segments. // Mark all segments overshadowed by served segments as unused diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index d371cb3e40b..1c2a867c4fa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -22,7 +22,6 @@ package org.apache.druid.server.coordinator.loading; import com.google.common.collect.Sets; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.client.DruidServer; -import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.balancer.BalancerStrategy; @@ -56,8 +55,6 @@ import java.util.stream.Collectors; @NotThreadSafe public class StrategicSegmentAssigner implements SegmentActionHandler { - private static final EmittingLogger log = new EmittingLogger(StrategicSegmentAssigner.class); - private final SegmentLoadQueueManager loadQueueManager; private final DruidCluster cluster; private final CoordinatorRunStats stats; @@ -71,6 +68,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler private final Map> datasourceToInvalidLoadTiers = new HashMap<>(); private final Map tierToHistoricalCount = new HashMap<>(); private final Map> segmentsToDelete = new HashMap<>(); + private final Map> segmentsWithZeroRequiredReplicas = new HashMap<>(); public StrategicSegmentAssigner( SegmentLoadQueueManager loadQueueManager, @@ -109,6 +107,11 @@ public class StrategicSegmentAssigner implements SegmentActionHandler return segmentsToDelete; } + public Map> getSegmentsWithZeroRequiredReplicas() + { + return segmentsWithZeroRequiredReplicas; + } + public Map> getDatasourceToInvalidLoadTiers() { return datasourceToInvalidLoadTiers; @@ -223,6 +226,12 @@ public class StrategicSegmentAssigner implements SegmentActionHandler } SegmentReplicaCount replicaCountInCluster = replicaCountMap.getTotal(segment.getId()); + if (replicaCountInCluster.required() <= 0) { + segmentsWithZeroRequiredReplicas + .computeIfAbsent(segment.getDataSource(), ds -> new HashSet<>()) + .add(segment); + } + final int replicaSurplus = replicaCountInCluster.loadedNotDropping() - replicaCountInCluster.requiredAndLoadable(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java index bd1af4a084a..6fd23bd8f46 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java @@ -223,6 +223,8 @@ public abstract class CoordinatorSimulationBaseTest implements static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count"; static final String DROP_QUEUE_COUNT = "segment/dropQueue/count"; static final String CANCELLED_ACTIONS = "segment/loadQueue/cancelled"; + + static final String OVERSHADOWED_COUNT = "segment/overshadowed/count"; } static class Segments diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/MarkSegmentsAsUnusedTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/MarkSegmentsAsUnusedTest.java new file mode 100644 index 00000000000..cd9ddcca1ae --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/MarkSegmentsAsUnusedTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.timeline.DataSegment; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class MarkSegmentsAsUnusedTest extends CoordinatorSimulationBaseTest +{ + @Override + public void setUp() + { + + } + + @Test + public void testSegmentsOvershadowedByZeroReplicaSegmentsAreMarkedAsUnused() + { + final long size1TB = 1_000_000; + final List servers = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + servers.add(createHistorical(i, Tier.T1, size1TB)); + } + + final List segmentsV0 = Segments.WIKI_10X1D; + final CoordinatorDynamicConfig dynamicConfig + = CoordinatorDynamicConfig.builder().withMarkSegmentAsUnusedDelayMillis(0).build(); + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withRules(DS.WIKI, Load.on(Tier.T1, 0).forever()) + .withServers(servers) + .withSegments(segmentsV0) + .withDynamicConfig(dynamicConfig) + .build(); + startSimulation(sim); + + // Run 1: No segment is loaded + runCoordinatorCycle(); + verifyNotEmitted(Metric.ASSIGNED_COUNT); + + // Add v1 segments to overshadow v0 segments + final List segmentsV1 = segmentsV0.stream().map( + segment -> segment.withVersion(segment.getVersion() + "__new") + ).collect(Collectors.toList()); + addSegments(segmentsV1); + + // Run 2: nothing is loaded, v0 segments are overshadowed and marked as unused + runCoordinatorCycle(); + verifyNotEmitted(Metric.ASSIGNED_COUNT); + verifyValue(Metric.OVERSHADOWED_COUNT, 10L); + } +}