Fix issues with caching cost strategy (#13321)

`cachingCost` strategy has some discrepancies when compared to cost strategy.
This commit addresses two of these by retaining the same behaviour as the `cost` strategy
when computing the cost of moving a segment to a server:
- subtract the self cost of a segment if it is being served by the target server
- subtract the cost of segments that are marked to be dropped

Other changes:
- Add tests to verify fixed strategy. These tests would fail without the fixes made to `CachingCostStrategy.computeCost()`
- Fix the definition of the segment related metrics in the docs.
- Fix some docs issues introduced in #13181
This commit is contained in:
Kashif Faraz 2022-11-08 16:11:39 +05:30 committed by GitHub
parent 594545da55
commit ff8e0c3397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 278 additions and 23 deletions

View File

@ -276,8 +276,9 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|------|-----------|----------|------------|
|`segment/assigned/count`|Number of segments assigned to be loaded in the cluster.|tier.|Varies.|
|`segment/moved/count`|Number of segments moved in the cluster.|tier.|Varies.|
|`segment/dropped/count`|Number of segments dropped due to being overshadowed.|tier.|Varies.|
|`segment/deleted/count`|Number of segments dropped due to rules.|tier.|Varies.|
|`segment/unmoved/count`|Number of segments which were chosen for balancing but were found to be already optimally placed.|tier.|Varies.|
|`segment/dropped/count`|Number of segments chosen to be dropped from the cluster due to being over-replicated.|tier.|Varies.|
|`segment/deleted/count`|Number of segments marked as unused due to drop rules.|tier.|Varies.|
|`segment/unneeded/count`|Number of segments dropped due to being marked as unused.|tier.|Varies.|
|`segment/cost/raw`|Used in cost balancing. The raw cost of hosting segments.|tier.|Varies.|
|`segment/cost/normalization`|Used in cost balancing. The normalization of hosting segments.|tier.|Varies.|
@ -288,7 +289,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`segment/dropQueue/count`|Number of segments to drop.|server.|Varies.|
|`segment/size`|Total size of used segments in a data source. Emitted only for data sources to which at least one used segment belongs.|dataSource.|Varies.|
|`segment/count`|Number of used segments belonging to a data source. Emitted only for data sources to which at least one used segment belongs.|dataSource.|< max|
|`segment/overShadowed/count`|Number of overshadowed segments.| |Varies.|
|`segment/overShadowed/count`|Number of segments marked as unused due to being overshadowed.| |Varies.|
|`segment/unavailable/count`|Number of segments (not including replicas) left to load until segments that should be loaded in the cluster are available for queries.|dataSource.|0|
|`segment/underReplicated/count`|Number of segments (including replicas) left to load until segments that should be loaded in the cluster are available for queries.|tier, dataSource.|0|
|`tier/historical/count`|Number of available historical nodes in each tier.|tier.|Varies.|

View File

@ -177,7 +177,7 @@ Set the following properties:
## Drop rules
Drop rules define when Druid drops segments from the cluster. Druid keeps dropped data in deep storage. Note that if you enable segment autokill, or you run a kill task, Druid deletes the data from deep storage. See [Data deletion](../data-management/delete.md) for more information on deleting data.
Drop rules define when Druid drops segments from the cluster. Druid keeps dropped data in deep storage. Note that if you enable automatic cleanup of unused segments, or you run a kill task, Druid deletes the data from deep storage. See [Data deletion](../data-management/delete.md) for more information on deleting data.
If you want to use a [load rule](#load-rules) to retain only data from a defined period of time, you must also define a drop rule. If you don't define a drop rule, Druid retains data that doesn't lie within your defined period according to the default rule, `loadForever`.
@ -305,7 +305,7 @@ Set the following property:
## Permanently delete data
Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any segments marked `unused`. Note that Druid always marks segments dropped from the cluster by rules as `unused`. You can submit a [kill task](./ingestion/tasks) to the [Overlord](./design/overlord) to do this.
Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any segments marked `unused`. Note that Druid always marks segments dropped from the cluster by rules as `unused`. You can submit a [kill task](../ingestion/tasks.md) to the [Overlord](../design/overlord.md) to do this.
## Reload dropped data

View File

@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.server.coordinator.cost.ClusterCostCache;
import org.apache.druid.timeline.DataSegment;
import java.util.Collections;
import java.util.Set;
@ -61,6 +62,16 @@ public class CachingCostBalancerStrategy extends CostBalancerStrategy
// add segments that will be loaded to the cost
cost += costCacheForLoadingSegments(server).computeCost(serverName, proposalSegment);
// minus the cost of the segment itself
if (server.isServingSegment(proposalSegment)) {
cost -= costCacheForSegments(server, Collections.singleton(proposalSegment))
.computeCost(serverName, proposalSegment);
}
// minus the costs of segments that are marked to be dropped
cost -= costCacheForSegments(server, server.getPeon().getSegmentsMarkedToDrop())
.computeCost(serverName, proposalSegment);
if (server.getAvailableSize() <= 0) {
return Double.POSITIVE_INFINITY;
}
@ -72,8 +83,12 @@ public class CachingCostBalancerStrategy extends CostBalancerStrategy
private ClusterCostCache costCacheForLoadingSegments(ServerHolder server)
{
final Set<DataSegment> loadingSegments = server.getPeon().getSegmentsToLoad();
return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), loadingSegments)).build();
return costCacheForSegments(server, server.getPeon().getSegmentsToLoad());
}
private ClusterCostCache costCacheForSegments(ServerHolder server, Set<DataSegment> segments)
{
return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), segments)).build();
}
}

View File

@ -188,6 +188,7 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
emitTieredStats(emitter, "segment/cost/normalization", stats, "normalization");
emitTieredStats(emitter, "segment/moved/count", stats, "movedCount");
emitTieredStats(emitter, "segment/unmoved/count", stats, "unmovedCount");
emitTieredStats(emitter, "segment/deleted/count", stats, "deletedCount");

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.simulate;
import org.apache.druid.client.DruidServer;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.List;
@RunWith(Parameterized.class)
public class BalancingStrategiesTest extends CoordinatorSimulationBaseTest
{
private static final long SIZE_1TB = 1_000_000;
private final String strategy;
private final List<DataSegment> segments = Segments.WIKI_10X100D;
@Parameterized.Parameters(name = "{0}")
public static String[] getTestParameters()
{
return new String[]{"cost", "cachingCost"};
}
public BalancingStrategiesTest(String strategy)
{
this.strategy = strategy;
}
@Override
public void setUp()
{
}
@Test
public void testFreshClusterGetsBalanced()
{
final List<DruidServer> historicals = new ArrayList<>();
for (int i = 0; i < 5; i++) {
historicals.add(createHistorical(i, Tier.T1, SIZE_1TB));
}
CoordinatorSimulation sim =
CoordinatorSimulation.builder()
.withDynamicConfig(createDynamicConfig(1000, 0, 100))
.withBalancer(strategy)
.withRules(DS.WIKI, Load.on(Tier.T1, 1).forever())
.withServers(historicals)
.withSegments(segments)
.build();
startSimulation(sim);
// Run 1: all segments are assigned and loaded
runCoordinatorCycle();
loadQueuedSegments();
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
verifyNoEvent(Metric.MOVED_COUNT);
verifyNoEvent(Metric.UNMOVED_COUNT);
for (DruidServer historical : historicals) {
Assert.assertEquals(200, historical.getTotalSegments());
}
// Run 2: nothing is assigned, nothing is moved as servers are already balanced
runCoordinatorCycle();
loadQueuedSegments();
verifyValue(Metric.ASSIGNED_COUNT, 0L);
verifyValue(Metric.MOVED_COUNT, 0L);
verifyValue(Metric.UNMOVED_COUNT, 1000L);
}
@Test
public void testClusterGetsBalancedWhenServerIsAdded()
{
final List<DruidServer> historicals = new ArrayList<>();
for (int i = 0; i < 4; i++) {
historicals.add(createHistorical(i, Tier.T1, SIZE_1TB));
}
CoordinatorSimulation sim =
CoordinatorSimulation.builder()
.withDynamicConfig(createDynamicConfig(1000, 0, 100))
.withBalancer(strategy)
.withRules(DS.WIKI, Load.on(Tier.T1, 1).forever())
.withServers(historicals)
.withSegments(segments)
.build();
startSimulation(sim);
// Run 1: all segments are assigned and loaded
runCoordinatorCycle();
loadQueuedSegments();
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
verifyNoEvent(Metric.MOVED_COUNT);
verifyNoEvent(Metric.UNMOVED_COUNT);
// Verify that each server is equally loaded
for (DruidServer historical : historicals) {
Assert.assertEquals(250, historical.getTotalSegments());
}
// Add another historical
final DruidServer newHistorical = createHistorical(4, Tier.T1, SIZE_1TB);
addServer(newHistorical);
historicals.add(newHistorical);
// Run the coordinator for a few cycles
for (int i = 0; i < 10; ++i) {
runCoordinatorCycle();
loadQueuedSegments();
}
// Verify that the segments have been balanced
for (DruidServer historical : historicals) {
long loadedSegments = historical.getTotalSegments();
Assert.assertTrue(loadedSegments >= 199 && loadedSegments <= 201);
}
}
@Test
public void testClusterGetsBalancedWhenServerIsRemoved()
{
final List<DruidServer> historicals = new ArrayList<>();
for (int i = 0; i < 5; i++) {
historicals.add(createHistorical(i, Tier.T1, SIZE_1TB));
}
CoordinatorSimulation sim =
CoordinatorSimulation.builder()
.withDynamicConfig(createDynamicConfig(1000, 0, 100))
.withBalancer(strategy)
.withRules(DS.WIKI, Load.on(Tier.T1, 1).forever())
.withServers(historicals)
.withSegments(segments)
.build();
startSimulation(sim);
// Run 1: all segments are assigned and loaded
runCoordinatorCycle();
loadQueuedSegments();
verifyValue(Metric.ASSIGNED_COUNT, 1000L);
verifyNoEvent(Metric.MOVED_COUNT);
verifyNoEvent(Metric.UNMOVED_COUNT);
// Verify that each server is equally loaded
for (DruidServer historical : historicals) {
Assert.assertEquals(200, historical.getTotalSegments());
}
// Remove a historical
final DruidServer removedHistorical = historicals.remove(4);
removeServer(removedHistorical);
// Run 2: the missing segments are assigned to the other servers
runCoordinatorCycle();
loadQueuedSegments();
int assignedCount = getValue(Metric.ASSIGNED_COUNT, null).intValue();
Assert.assertTrue(assignedCount >= 200);
for (DruidServer historical : historicals) {
Assert.assertEquals(250, historical.getTotalSegments());
}
}
}

View File

@ -103,5 +103,10 @@ public interface CoordinatorSimulation
* Removes the specified server from the cluster.
*/
void removeServer(DruidServer server);
/**
* Adds the specified server to the cluster.
*/
void addServer(DruidServer server);
}
}

View File

@ -129,6 +129,12 @@ public abstract class CoordinatorSimulationBaseTest
sim.cluster().removeServer(server);
}
@Override
public void addServer(DruidServer server)
{
sim.cluster().addServer(server);
}
@Override
public double getLoadPercentage(String datasource)
{
@ -267,6 +273,7 @@ public abstract class CoordinatorSimulationBaseTest
{
static final String ASSIGNED_COUNT = "segment/assigned/count";
static final String MOVED_COUNT = "segment/moved/count";
static final String UNMOVED_COUNT = "segment/unmoved/count";
static final String DROPPED_COUNT = "segment/dropped/count";
static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
}
@ -283,6 +290,17 @@ public abstract class CoordinatorSimulationBaseTest
.startingAt("2022-01-01")
.withNumPartitions(10)
.eachOfSizeInMb(500);
/**
* Segments of datasource {@link DS#WIKI}, size 500 MB each,
* spanning 100 days containing 10 partitions.
*/
static final List<DataSegment> WIKI_10X100D =
CreateDataSegments.ofDatasource(DS.WIKI)
.forIntervals(100, Granularities.DAY)
.startingAt("2022-01-01")
.withNumPartitions(10)
.eachOfSizeInMb(500);
}
/**

View File

@ -26,6 +26,7 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.DirectExecutorService;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@ -40,9 +41,11 @@ import org.apache.druid.server.coordinator.CachingCostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.DiskNormalizedCostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.RandomBalancerStrategyFactory;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.rules.Rule;
@ -76,7 +79,7 @@ public class CoordinatorSimulationBuilder
)
);
private BalancerStrategyFactory balancerStrategyFactory;
private String balancerStrategy;
private CoordinatorDynamicConfig dynamicConfig =
CoordinatorDynamicConfig.builder()
.withUseBatchedSegmentSampler(true)
@ -92,9 +95,9 @@ public class CoordinatorSimulationBuilder
* <p>
* Default: "cost" ({@link CostBalancerStrategyFactory})
*/
public CoordinatorSimulationBuilder withBalancer(BalancerStrategyFactory strategyFactory)
public CoordinatorSimulationBuilder withBalancer(String balancerStrategy)
{
this.balancerStrategyFactory = strategyFactory;
this.balancerStrategy = balancerStrategy;
return this;
}
@ -209,8 +212,7 @@ public class CoordinatorSimulationBuilder
Collections.emptySet(),
null,
new CoordinatorCustomDutyGroups(Collections.emptySet()),
balancerStrategyFactory != null ? balancerStrategyFactory
: new CostBalancerStrategyFactory(),
createBalancerStrategy(env),
env.lookupCoordinatorManager,
env.leaderSelector,
OBJECT_MAPPER
@ -219,6 +221,26 @@ public class CoordinatorSimulationBuilder
return new SimulationImpl(coordinator, env);
}
private BalancerStrategyFactory createBalancerStrategy(Environment env)
{
if (balancerStrategy == null) {
return new CostBalancerStrategyFactory();
}
switch (balancerStrategy) {
case "cost":
return new CostBalancerStrategyFactory();
case "cachingCost":
return buildCachingCostBalancerStrategy(env);
case "diskNormalized":
return new DiskNormalizedCostBalancerStrategyFactory();
case "random":
return new RandomBalancerStrategyFactory();
default:
throw new IAE("Unknown balancer stratgy: " + balancerStrategy);
}
}
private BalancerStrategyFactory buildCachingCostBalancerStrategy(Environment env)
{
try {
@ -347,6 +369,12 @@ public class CoordinatorSimulationBuilder
env.inventory.removeServer(server);
}
@Override
public void addServer(DruidServer server)
{
env.inventory.addServer(server);
}
private void verifySimulationRunning()
{
if (!running.get()) {

View File

@ -114,10 +114,9 @@ public class SegmentBalancingTest extends CoordinatorSimulationBaseTest
startSimulation(sim);
runCoordinatorCycle();
// Verify that segments have been chosen for balancing
verifyValue(Metric.MOVED_COUNT, 5L);
// Remove histT12 from cluster so that the move fails
removeServer(historicalT12);
loadQueuedSegments();

View File

@ -158,20 +158,22 @@ public class TestServerInventoryView implements ServerInventoryView
{
log.debug("Adding segment [%s] to server [%s]", segment.getId(), server.getName());
if (server.getMaxSize() - server.getCurrSize() >= segment.getSize()) {
server.addDataSegment(segment);
segmentCallbacks.forEach(
(segmentCallback, executor) -> executor.execute(
() -> segmentCallback.segmentAdded(server.getMetadata(), segment)
)
);
} else {
if (server.getSegment(segment.getId()) != null) {
log.debug("Server [%s] already serving segment [%s]", server.getName(), segment);
} else if (server.getMaxSize() - server.getCurrSize() < segment.getSize()) {
throw new ISE(
"Not enough free space on server %s. Segment size [%d]. Current free space [%d]",
server.getName(),
segment.getSize(),
server.getMaxSize() - server.getCurrSize()
);
} else {
server.addDataSegment(segment);
segmentCallbacks.forEach(
(segmentCallback, executor) -> executor.execute(
() -> segmentCallback.segmentAdded(server.getMetadata(), segment)
)
);
}
}
@ -185,7 +187,7 @@ public class TestServerInventoryView implements ServerInventoryView
server.removeDataSegment(segment.getId());
segmentCallbacks.forEach(
(segmentCallback, executor) -> executor.execute(
() -> segmentCallback.segmentAdded(server.getMetadata(), segment)
() -> segmentCallback.segmentRemoved(server.getMetadata(), segment)
)
);
}