diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index 34cd830285d..da0b8e8b2e7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -45,6 +45,7 @@ public class DruidCluster private final Set realtimes; private final Map> historicals; private final Set brokers; + private final List allServers; private DruidCluster( Set realtimes, @@ -58,6 +59,7 @@ public class DruidCluster holders -> CollectionUtils.newTreeSet(Comparator.naturalOrder(), holders) ); this.brokers = Collections.unmodifiableSet(brokers); + this.allServers = initAllServers(); } public Set getRealtimes() @@ -85,7 +87,12 @@ public class DruidCluster return historicals.get(tier); } - public Collection getAllServers() + public List getAllServers() + { + return allServers; + } + + private List initAllServers() { final int historicalSize = historicals.values().stream().mapToInt(Collection::size).sum(); final int realtimeSize = realtimes.size(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index 7e7301cd170..8bdcee3641d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -31,8 +31,10 @@ import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; /** * Unloads segments that are no longer marked as used from servers. @@ -56,53 +58,46 @@ public class UnloadUnusedSegments implements CoordinatorDuty broadcastStatusByDatasource.put(broadcastDatasource, true); } + final List allServers = params.getDruidCluster().getAllServers(); + int numCancelledLoads = allServers.stream().mapToInt( + server -> cancelLoadOfUnusedSegments(server, broadcastStatusByDatasource, params) + ).sum(); + final CoordinatorRunStats stats = params.getCoordinatorStats(); - params.getDruidCluster().getAllServers().forEach( - server -> handleUnusedSegmentsForServer( - server, - params, - stats, - broadcastStatusByDatasource - ) - ); + int numQueuedDrops = allServers.stream().mapToInt( + server -> dropUnusedSegments(server, params, stats, broadcastStatusByDatasource) + ).sum(); + + if (numCancelledLoads > 0 || numQueuedDrops > 0) { + log.info("Cancelled [%d] loads and started [%d] drops of unused segments.", numCancelledLoads, numQueuedDrops); + } return params; } - private void handleUnusedSegmentsForServer( + private int dropUnusedSegments( ServerHolder serverHolder, DruidCoordinatorRuntimeParams params, CoordinatorRunStats stats, Map broadcastStatusByDatasource ) { - ImmutableDruidServer server = serverHolder.getServer(); - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { - boolean isBroadcastDatasource = broadcastStatusByDatasource.computeIfAbsent( - dataSource.getName(), - dataSourceName -> isBroadcastDatasource(dataSourceName, params) - ); + final Set usedSegments = params.getUsedSegments(); - // The coordinator tracks used segments by examining the metadata store. - // For tasks, the segments they create are unpublished, so those segments will get dropped - // unless we exclude them here. We currently drop only broadcast segments in that case. - // This check relies on the assumption that queryable stream tasks will never - // ingest data to a broadcast datasource. If a broadcast datasource is switched to become a non-broadcast - // datasource, this will result in the those segments not being dropped from tasks. - // A more robust solution which requires a larger rework could be to expose - // the set of segments that were created by a task/indexer here, and exclude them. - if (serverHolder.isRealtimeServer() && !isBroadcastDatasource) { + final AtomicInteger numQueuedDrops = new AtomicInteger(0); + final ImmutableDruidServer server = serverHolder.getServer(); + for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + if (shouldSkipUnload(serverHolder, dataSource.getName(), broadcastStatusByDatasource, params)) { continue; } int totalUnneededCount = 0; - final Set usedSegments = params.getUsedSegments(); for (DataSegment segment : dataSource.getSegments()) { if (!usedSegments.contains(segment) && loadQueueManager.dropSegment(segment, serverHolder)) { totalUnneededCount++; - log.info( - "Dropping uneeded segment [%s] from server [%s] in tier [%s]", + log.debug( + "Dropping uneeded segment[%s] from server[%s] in tier[%s]", segment.getId(), server.getName(), server.getTier() ); } @@ -110,8 +105,55 @@ public class UnloadUnusedSegments implements CoordinatorDuty if (totalUnneededCount > 0) { stats.addToSegmentStat(Stats.Segments.UNNEEDED, server.getTier(), dataSource.getName(), totalUnneededCount); + numQueuedDrops.addAndGet(totalUnneededCount); } } + + return numQueuedDrops.get(); + } + + private int cancelLoadOfUnusedSegments( + ServerHolder server, + Map broadcastStatusByDatasource, + DruidCoordinatorRuntimeParams params + ) + { + final Set usedSegments = params.getUsedSegments(); + + final AtomicInteger cancelledOperations = new AtomicInteger(0); + server.getQueuedSegments().forEach((segment, action) -> { + if (shouldSkipUnload(server, segment.getDataSource(), broadcastStatusByDatasource, params)) { + // do nothing + } else if (usedSegments.contains(segment)) { + // do nothing + } else if (action.isLoad() && server.cancelOperation(action, segment)) { + cancelledOperations.incrementAndGet(); + } + }); + + return cancelledOperations.get(); + } + + /** + * Returns true if the given server is a realtime server AND the datasource is + * NOT a broadcast datasource. + *

+ * Realtime tasks work with unpublished segments and the tasks themselves are + * responsible for dropping those segments. However, segments belonging to a + * broadcast datasource should still be dropped by the Coordinator as realtime + * tasks do not ingest data to a broadcast datasource and are thus not + * responsible for the load/unload of those segments. + */ + private boolean shouldSkipUnload( + ServerHolder server, + String dataSource, + Map broadcastStatusByDatasource, + DruidCoordinatorRuntimeParams params + ) + { + boolean isBroadcastDatasource = broadcastStatusByDatasource + .computeIfAbsent(dataSource, ds -> isBroadcastDatasource(ds, params)); + return server.isRealtimeServer() && !isBroadcastDatasource; } /** diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java similarity index 96% rename from server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java rename to server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java index 62f84b631a3..a3c769ef96e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java @@ -17,14 +17,14 @@ * under the License. */ -package org.apache.druid.server.coordinator; +package org.apache.druid.server.coordinator.loading; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.IndexIO; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.coordinator.loading.RoundRobinServerSelector; -import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; +import org.apache.druid.server.coordinator.DruidCluster; +import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.junit.Assert; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java index c0565c19ba9..e0dc5b128bf 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java @@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.metrics.MetricsVerifier; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; import java.util.List; @@ -74,6 +75,11 @@ public interface CoordinatorSimulation */ void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig); + /** + * Sets the retention rules for the given datasource. + */ + void setRetentionRules(String datasource, Rule... rules); + /** * Gets the inventory view of the specified server as maintained by the * coordinator. diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java index c9343da85a6..984fe217afb 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java @@ -26,6 +26,7 @@ import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; +import org.apache.druid.server.coordinator.rules.ForeverDropRule; import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.coordinator.stats.Dimension; @@ -112,6 +113,12 @@ public abstract class CoordinatorSimulationBaseTest implements sim.coordinator().setDynamicConfig(dynamicConfig); } + @Override + public void setRetentionRules(String datasource, Rule... rules) + { + sim.coordinator().setRetentionRules(datasource, rules); + } + @Override public void loadQueuedSegments() { @@ -211,6 +218,8 @@ public abstract class CoordinatorSimulationBaseTest implements static final String ASSIGNED_COUNT = "segment/assigned/count"; static final String MOVED_COUNT = "segment/moved/count"; static final String DROPPED_COUNT = "segment/dropped/count"; + static final String OVERSHADOWED_COUNT = "segment/overshadowed/count"; + static final String DELETED_COUNT = "segment/deleted/count"; static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count"; static final String DROP_QUEUE_COUNT = "segment/dropQueue/count"; static final String CANCELLED_ACTIONS = "segment/loadQueue/cancelled"; @@ -288,4 +297,15 @@ public abstract class CoordinatorSimulationBaseTest implements return new ForeverBroadcastDistributionRule(); } } + + /** + * Builder for a drop rule. + */ + static class Drop + { + static Rule forever() + { + return new ForeverDropRule(); + } + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 43f7bd9872a..c5d5d94293f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.simulate; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import org.apache.druid.audit.AuditInfo; import org.apache.druid.client.DruidServer; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.curator.discovery.ServiceAnnouncer; @@ -330,6 +331,16 @@ public class CoordinatorSimulationBuilder env.setDynamicConfig(dynamicConfig); } + @Override + public void setRetentionRules(String datasource, Rule... rules) + { + env.ruleManager.overrideRule( + datasource, + Arrays.asList(rules), + new AuditInfo("sim", "sim", "localhost") + ); + } + @Override public DruidServer getInventoryView(String serverName) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java index f0ea5cc9ae5..4b7965e9596 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java @@ -482,6 +482,33 @@ public class SegmentLoadingTest extends CoordinatorSimulationBaseTest Assert.assertEquals(0, historicalT12.getTotalSegments()); } + @Test + public void testLoadOfUnusedSegmentIsCancelled() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .build(); + + startSimulation(sim); + + // Run 1: All segments are assigned + runCoordinatorCycle(); + verifyValue(Metric.ASSIGNED_COUNT, 10L); + + // Run 2: Update rules, all segments are marked as unused + setRetentionRules(datasource, Drop.forever()); + runCoordinatorCycle(); + verifyValue(Metric.DELETED_COUNT, 10L); + + // Run 3: Loads of unused segments are cancelled + runCoordinatorCycle(); + verifyValue(Metric.LOAD_QUEUE_COUNT, 0L); + verifyValue(Metric.CANCELLED_ACTIONS, 10L); + } + @Test public void testSegmentsAreDroppedFromFullServersFirst() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index ac7f2dacb66..e0d1d887efe 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -127,13 +127,22 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager ++numModifiedSegments; } } + + if (numModifiedSegments > 0) { + snapshot = null; + } return numModifiedSegments; } @Override public boolean markSegmentAsUnused(SegmentId segmentId) { - return usedSegments.remove(segmentId.toString()) != null; + boolean updated = usedSegments.remove(segmentId.toString()) != null; + if (updated) { + snapshot = null; + } + + return updated; } @Nullable