From ff8e0c3397c3677b85e521f141ffcc650094ca91 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 8 Nov 2022 16:11:39 +0530 Subject: [PATCH] Fix issues with caching cost strategy (#13321) `cachingCost` strategy has some discrepancies when compared to cost strategy. This commit addresses two of these by retaining the same behaviour as the `cost` strategy when computing the cost of moving a segment to a server: - subtract the self cost of a segment if it is being served by the target server - subtract the cost of segments that are marked to be dropped Other changes: - Add tests to verify fixed strategy. These tests would fail without the fixes made to `CachingCostStrategy.computeCost()` - Fix the definition of the segment related metrics in the docs. - Fix some docs issues introduced in #13181 --- docs/operations/metrics.md | 7 +- docs/operations/rule-configuration.md | 4 +- .../CachingCostBalancerStrategy.java | 19 +- .../duty/EmitClusterStatsAndMetrics.java | 1 + .../simulate/BalancingStrategiesTest.java | 186 ++++++++++++++++++ .../simulate/CoordinatorSimulation.java | 5 + .../CoordinatorSimulationBaseTest.java | 18 ++ .../CoordinatorSimulationBuilder.java | 38 +++- .../simulate/SegmentBalancingTest.java | 3 +- .../simulate/TestServerInventoryView.java | 20 +- 10 files changed, 278 insertions(+), 23 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/simulate/BalancingStrategiesTest.java diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 7f913790435..2c9f4941a86 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -276,8 +276,9 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |------|-----------|----------|------------| |`segment/assigned/count`|Number of segments assigned to be loaded in the cluster.|tier.|Varies.| |`segment/moved/count`|Number of segments moved in the cluster.|tier.|Varies.| -|`segment/dropped/count`|Number of segments dropped due to being overshadowed.|tier.|Varies.| -|`segment/deleted/count`|Number of segments dropped due to rules.|tier.|Varies.| +|`segment/unmoved/count`|Number of segments which were chosen for balancing but were found to be already optimally placed.|tier.|Varies.| +|`segment/dropped/count`|Number of segments chosen to be dropped from the cluster due to being over-replicated.|tier.|Varies.| +|`segment/deleted/count`|Number of segments marked as unused due to drop rules.|tier.|Varies.| |`segment/unneeded/count`|Number of segments dropped due to being marked as unused.|tier.|Varies.| |`segment/cost/raw`|Used in cost balancing. The raw cost of hosting segments.|tier.|Varies.| |`segment/cost/normalization`|Used in cost balancing. The normalization of hosting segments.|tier.|Varies.| @@ -288,7 +289,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`segment/dropQueue/count`|Number of segments to drop.|server.|Varies.| |`segment/size`|Total size of used segments in a data source. Emitted only for data sources to which at least one used segment belongs.|dataSource.|Varies.| |`segment/count`|Number of used segments belonging to a data source. Emitted only for data sources to which at least one used segment belongs.|dataSource.|< max| -|`segment/overShadowed/count`|Number of overshadowed segments.| |Varies.| +|`segment/overShadowed/count`|Number of segments marked as unused due to being overshadowed.| |Varies.| |`segment/unavailable/count`|Number of segments (not including replicas) left to load until segments that should be loaded in the cluster are available for queries.|dataSource.|0| |`segment/underReplicated/count`|Number of segments (including replicas) left to load until segments that should be loaded in the cluster are available for queries.|tier, dataSource.|0| |`tier/historical/count`|Number of available historical nodes in each tier.|tier.|Varies.| diff --git a/docs/operations/rule-configuration.md b/docs/operations/rule-configuration.md index b61231182f3..14df071c6df 100644 --- a/docs/operations/rule-configuration.md +++ b/docs/operations/rule-configuration.md @@ -177,7 +177,7 @@ Set the following properties: ## Drop rules -Drop rules define when Druid drops segments from the cluster. Druid keeps dropped data in deep storage. Note that if you enable segment autokill, or you run a kill task, Druid deletes the data from deep storage. See [Data deletion](../data-management/delete.md) for more information on deleting data. +Drop rules define when Druid drops segments from the cluster. Druid keeps dropped data in deep storage. Note that if you enable automatic cleanup of unused segments, or you run a kill task, Druid deletes the data from deep storage. See [Data deletion](../data-management/delete.md) for more information on deleting data. If you want to use a [load rule](#load-rules) to retain only data from a defined period of time, you must also define a drop rule. If you don't define a drop rule, Druid retains data that doesn't lie within your defined period according to the default rule, `loadForever`. @@ -305,7 +305,7 @@ Set the following property: ## Permanently delete data -Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any segments marked `unused`. Note that Druid always marks segments dropped from the cluster by rules as `unused`. You can submit a [kill task](./ingestion/tasks) to the [Overlord](./design/overlord) to do this. +Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any segments marked `unused`. Note that Druid always marks segments dropped from the cluster by rules as `unused`. You can submit a [kill task](../ingestion/tasks.md) to the [Overlord](../design/overlord.md) to do this. ## Reload dropped data diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategy.java index 22387406f45..03f16b1082f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CachingCostBalancerStrategy.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.druid.server.coordinator.cost.ClusterCostCache; import org.apache.druid.timeline.DataSegment; +import java.util.Collections; import java.util.Set; @@ -61,6 +62,16 @@ public class CachingCostBalancerStrategy extends CostBalancerStrategy // add segments that will be loaded to the cost cost += costCacheForLoadingSegments(server).computeCost(serverName, proposalSegment); + // minus the cost of the segment itself + if (server.isServingSegment(proposalSegment)) { + cost -= costCacheForSegments(server, Collections.singleton(proposalSegment)) + .computeCost(serverName, proposalSegment); + } + + // minus the costs of segments that are marked to be dropped + cost -= costCacheForSegments(server, server.getPeon().getSegmentsMarkedToDrop()) + .computeCost(serverName, proposalSegment); + if (server.getAvailableSize() <= 0) { return Double.POSITIVE_INFINITY; } @@ -72,8 +83,12 @@ public class CachingCostBalancerStrategy extends CostBalancerStrategy private ClusterCostCache costCacheForLoadingSegments(ServerHolder server) { - final Set loadingSegments = server.getPeon().getSegmentsToLoad(); - return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), loadingSegments)).build(); + return costCacheForSegments(server, server.getPeon().getSegmentsToLoad()); + } + + private ClusterCostCache costCacheForSegments(ServerHolder server, Set segments) + { + return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), segments)).build(); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java index 71e0d8ab5cc..e9fa8b188a3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java @@ -188,6 +188,7 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty emitTieredStats(emitter, "segment/cost/normalization", stats, "normalization"); emitTieredStats(emitter, "segment/moved/count", stats, "movedCount"); + emitTieredStats(emitter, "segment/unmoved/count", stats, "unmovedCount"); emitTieredStats(emitter, "segment/deleted/count", stats, "deletedCount"); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BalancingStrategiesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BalancingStrategiesTest.java new file mode 100644 index 00000000000..fd1c6c4d0f5 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BalancingStrategiesTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.List; + +@RunWith(Parameterized.class) +public class BalancingStrategiesTest extends CoordinatorSimulationBaseTest +{ + private static final long SIZE_1TB = 1_000_000; + + private final String strategy; + private final List segments = Segments.WIKI_10X100D; + + @Parameterized.Parameters(name = "{0}") + public static String[] getTestParameters() + { + return new String[]{"cost", "cachingCost"}; + } + + public BalancingStrategiesTest(String strategy) + { + this.strategy = strategy; + } + + @Override + public void setUp() + { + + } + + @Test + public void testFreshClusterGetsBalanced() + { + final List historicals = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + historicals.add(createHistorical(i, Tier.T1, SIZE_1TB)); + } + + CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withDynamicConfig(createDynamicConfig(1000, 0, 100)) + .withBalancer(strategy) + .withRules(DS.WIKI, Load.on(Tier.T1, 1).forever()) + .withServers(historicals) + .withSegments(segments) + .build(); + startSimulation(sim); + + // Run 1: all segments are assigned and loaded + runCoordinatorCycle(); + loadQueuedSegments(); + verifyValue(Metric.ASSIGNED_COUNT, 1000L); + verifyNoEvent(Metric.MOVED_COUNT); + verifyNoEvent(Metric.UNMOVED_COUNT); + + for (DruidServer historical : historicals) { + Assert.assertEquals(200, historical.getTotalSegments()); + } + + // Run 2: nothing is assigned, nothing is moved as servers are already balanced + runCoordinatorCycle(); + loadQueuedSegments(); + verifyValue(Metric.ASSIGNED_COUNT, 0L); + verifyValue(Metric.MOVED_COUNT, 0L); + verifyValue(Metric.UNMOVED_COUNT, 1000L); + } + + @Test + public void testClusterGetsBalancedWhenServerIsAdded() + { + final List historicals = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + historicals.add(createHistorical(i, Tier.T1, SIZE_1TB)); + } + + CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withDynamicConfig(createDynamicConfig(1000, 0, 100)) + .withBalancer(strategy) + .withRules(DS.WIKI, Load.on(Tier.T1, 1).forever()) + .withServers(historicals) + .withSegments(segments) + .build(); + startSimulation(sim); + + // Run 1: all segments are assigned and loaded + runCoordinatorCycle(); + loadQueuedSegments(); + verifyValue(Metric.ASSIGNED_COUNT, 1000L); + verifyNoEvent(Metric.MOVED_COUNT); + verifyNoEvent(Metric.UNMOVED_COUNT); + + // Verify that each server is equally loaded + for (DruidServer historical : historicals) { + Assert.assertEquals(250, historical.getTotalSegments()); + } + + // Add another historical + final DruidServer newHistorical = createHistorical(4, Tier.T1, SIZE_1TB); + addServer(newHistorical); + historicals.add(newHistorical); + + // Run the coordinator for a few cycles + for (int i = 0; i < 10; ++i) { + runCoordinatorCycle(); + loadQueuedSegments(); + } + + // Verify that the segments have been balanced + for (DruidServer historical : historicals) { + long loadedSegments = historical.getTotalSegments(); + Assert.assertTrue(loadedSegments >= 199 && loadedSegments <= 201); + } + } + + @Test + public void testClusterGetsBalancedWhenServerIsRemoved() + { + final List historicals = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + historicals.add(createHistorical(i, Tier.T1, SIZE_1TB)); + } + + CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withDynamicConfig(createDynamicConfig(1000, 0, 100)) + .withBalancer(strategy) + .withRules(DS.WIKI, Load.on(Tier.T1, 1).forever()) + .withServers(historicals) + .withSegments(segments) + .build(); + startSimulation(sim); + + // Run 1: all segments are assigned and loaded + runCoordinatorCycle(); + loadQueuedSegments(); + verifyValue(Metric.ASSIGNED_COUNT, 1000L); + verifyNoEvent(Metric.MOVED_COUNT); + verifyNoEvent(Metric.UNMOVED_COUNT); + + // Verify that each server is equally loaded + for (DruidServer historical : historicals) { + Assert.assertEquals(200, historical.getTotalSegments()); + } + + // Remove a historical + final DruidServer removedHistorical = historicals.remove(4); + removeServer(removedHistorical); + + // Run 2: the missing segments are assigned to the other servers + runCoordinatorCycle(); + loadQueuedSegments(); + int assignedCount = getValue(Metric.ASSIGNED_COUNT, null).intValue(); + Assert.assertTrue(assignedCount >= 200); + + for (DruidServer historical : historicals) { + Assert.assertEquals(250, historical.getTotalSegments()); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java index 0c2af979bb5..c4785fd0bc8 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java @@ -103,5 +103,10 @@ public interface CoordinatorSimulation * Removes the specified server from the cluster. */ void removeServer(DruidServer server); + + /** + * Adds the specified server to the cluster. + */ + void addServer(DruidServer server); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java index ee0ed6cc264..b3980630d74 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java @@ -129,6 +129,12 @@ public abstract class CoordinatorSimulationBaseTest sim.cluster().removeServer(server); } + @Override + public void addServer(DruidServer server) + { + sim.cluster().addServer(server); + } + @Override public double getLoadPercentage(String datasource) { @@ -267,6 +273,7 @@ public abstract class CoordinatorSimulationBaseTest { static final String ASSIGNED_COUNT = "segment/assigned/count"; static final String MOVED_COUNT = "segment/moved/count"; + static final String UNMOVED_COUNT = "segment/unmoved/count"; static final String DROPPED_COUNT = "segment/dropped/count"; static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count"; } @@ -283,6 +290,17 @@ public abstract class CoordinatorSimulationBaseTest .startingAt("2022-01-01") .withNumPartitions(10) .eachOfSizeInMb(500); + + /** + * Segments of datasource {@link DS#WIKI}, size 500 MB each, + * spanning 100 days containing 10 partitions. + */ + static final List WIKI_10X100D = + CreateDataSegments.ofDatasource(DS.WIKI) + .forIntervals(100, Granularities.DAY) + .startingAt("2022-01-01") + .withNumPartitions(10) + .eachOfSizeInMb(500); } /** diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index a5e02e98981..2e85f818452 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -26,6 +26,7 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.curator.discovery.ServiceAnnouncer; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.DirectExecutorService; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -40,9 +41,11 @@ import org.apache.druid.server.coordinator.CachingCostBalancerStrategyFactory; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.CostBalancerStrategyFactory; +import org.apache.druid.server.coordinator.DiskNormalizedCostBalancerStrategyFactory; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.LoadQueueTaskMaster; +import org.apache.druid.server.coordinator.RandomBalancerStrategyFactory; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; import org.apache.druid.server.coordinator.rules.Rule; @@ -76,7 +79,7 @@ public class CoordinatorSimulationBuilder ) ); - private BalancerStrategyFactory balancerStrategyFactory; + private String balancerStrategy; private CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder() .withUseBatchedSegmentSampler(true) @@ -92,9 +95,9 @@ public class CoordinatorSimulationBuilder *

* Default: "cost" ({@link CostBalancerStrategyFactory}) */ - public CoordinatorSimulationBuilder withBalancer(BalancerStrategyFactory strategyFactory) + public CoordinatorSimulationBuilder withBalancer(String balancerStrategy) { - this.balancerStrategyFactory = strategyFactory; + this.balancerStrategy = balancerStrategy; return this; } @@ -209,8 +212,7 @@ public class CoordinatorSimulationBuilder Collections.emptySet(), null, new CoordinatorCustomDutyGroups(Collections.emptySet()), - balancerStrategyFactory != null ? balancerStrategyFactory - : new CostBalancerStrategyFactory(), + createBalancerStrategy(env), env.lookupCoordinatorManager, env.leaderSelector, OBJECT_MAPPER @@ -219,6 +221,26 @@ public class CoordinatorSimulationBuilder return new SimulationImpl(coordinator, env); } + private BalancerStrategyFactory createBalancerStrategy(Environment env) + { + if (balancerStrategy == null) { + return new CostBalancerStrategyFactory(); + } + + switch (balancerStrategy) { + case "cost": + return new CostBalancerStrategyFactory(); + case "cachingCost": + return buildCachingCostBalancerStrategy(env); + case "diskNormalized": + return new DiskNormalizedCostBalancerStrategyFactory(); + case "random": + return new RandomBalancerStrategyFactory(); + default: + throw new IAE("Unknown balancer stratgy: " + balancerStrategy); + } + } + private BalancerStrategyFactory buildCachingCostBalancerStrategy(Environment env) { try { @@ -347,6 +369,12 @@ public class CoordinatorSimulationBuilder env.inventory.removeServer(server); } + @Override + public void addServer(DruidServer server) + { + env.inventory.addServer(server); + } + private void verifySimulationRunning() { if (!running.get()) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java index a2c4ef64228..1a90ee1eb25 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java @@ -114,10 +114,9 @@ public class SegmentBalancingTest extends CoordinatorSimulationBaseTest startSimulation(sim); runCoordinatorCycle(); - - // Verify that segments have been chosen for balancing verifyValue(Metric.MOVED_COUNT, 5L); + // Remove histT12 from cluster so that the move fails removeServer(historicalT12); loadQueuedSegments(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java index fedafc45c9d..df17f8beaba 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java @@ -158,20 +158,22 @@ public class TestServerInventoryView implements ServerInventoryView { log.debug("Adding segment [%s] to server [%s]", segment.getId(), server.getName()); - if (server.getMaxSize() - server.getCurrSize() >= segment.getSize()) { - server.addDataSegment(segment); - segmentCallbacks.forEach( - (segmentCallback, executor) -> executor.execute( - () -> segmentCallback.segmentAdded(server.getMetadata(), segment) - ) - ); - } else { + if (server.getSegment(segment.getId()) != null) { + log.debug("Server [%s] already serving segment [%s]", server.getName(), segment); + } else if (server.getMaxSize() - server.getCurrSize() < segment.getSize()) { throw new ISE( "Not enough free space on server %s. Segment size [%d]. Current free space [%d]", server.getName(), segment.getSize(), server.getMaxSize() - server.getCurrSize() ); + } else { + server.addDataSegment(segment); + segmentCallbacks.forEach( + (segmentCallback, executor) -> executor.execute( + () -> segmentCallback.segmentAdded(server.getMetadata(), segment) + ) + ); } } @@ -185,7 +187,7 @@ public class TestServerInventoryView implements ServerInventoryView server.removeDataSegment(segment.getId()); segmentCallbacks.forEach( (segmentCallback, executor) -> executor.execute( - () -> segmentCallback.segmentAdded(server.getMetadata(), segment) + () -> segmentCallback.segmentRemoved(server.getMetadata(), segment) ) ); }