From fe2f65642785ca80452126716ac287455e8a1c02 Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Fri, 12 Jun 2020 02:33:28 -0700 Subject: [PATCH] Fix broadcast rule drop and docs (#10019) * Fix broadcast rule drop and docs * Remove racy test check * Don't drop non-broadcast segments on tasks, add overshadowing handling * Don't use realtimes for overshadowing * Fix dropping for ingestion services --- docs/operations/rule-configuration.md | 18 +- .../MarkAsUnusedOvershadowedSegments.java | 35 +- .../server/coordinator/duty/RunRules.java | 21 +- .../duty/UnloadUnusedSegments.java | 118 +++++- .../MarkAsUnusedOvershadowedSegmentsTest.java | 16 +- .../duty/UnloadUnusedSegmentsTest.java | 364 ++++++++++++++++++ 6 files changed, 521 insertions(+), 51 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java diff --git a/docs/operations/rule-configuration.md b/docs/operations/rule-configuration.md index 4228994a30d..1ea074f7239 100644 --- a/docs/operations/rule-configuration.md +++ b/docs/operations/rule-configuration.md @@ -170,8 +170,9 @@ The interval of a segment will be compared against the specified period. The per ## Broadcast Rules -Broadcast rules indicate how segments of different datasources should be co-located in Historical processes. -Once a broadcast rule is configured for a datasource, all segments of the datasource are broadcasted to the servers holding _any segments_ of the co-located datasources. +Broadcast rules indicate that segments of a data source should be loaded by all servers of a cluster of the following types: historicals, brokers, tasks, and indexers. + +Note that the broadcast segments are only directly queryable through the historicals, but they are currently loaded on other server types to support join queries. ### Forever Broadcast Rule @@ -179,13 +180,13 @@ Forever broadcast rules are of the form: ```json { - "type" : "broadcastForever", - "colocatedDataSources" : [ "target_source1", "target_source2" ] + "type" : "broadcastForever" } ``` * `type` - this should always be "broadcastForever" -* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster. + +This rule applies to all segments of a datasource, covering all intervals. ### Interval Broadcast Rule @@ -194,13 +195,11 @@ Interval broadcast rules are of the form: ```json { "type" : "broadcastByInterval", - "colocatedDataSources" : [ "target_source1", "target_source2" ], "interval" : "2012-01-01/2013-01-01" } ``` * `type` - this should always be "broadcastByInterval" -* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster. * `interval` - A JSON Object representing ISO-8601 Periods. Only the segments of the interval will be broadcasted. ### Period Broadcast Rule @@ -210,22 +209,17 @@ Period broadcast rules are of the form: ```json { "type" : "broadcastByPeriod", - "colocatedDataSources" : [ "target_source1", "target_source2" ], "period" : "P1M", "includeFuture" : true } ``` * `type` - this should always be "broadcastByPeriod" -* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster. * `period` - A JSON Object representing ISO-8601 Periods * `includeFuture` - A JSON Boolean indicating whether the load period should include the future. This property is optional, Default is true. The interval of a segment will be compared against the specified period. The period is from some time in the past to the future or to the current time, which depends on `includeFuture` is true or false. The rule matches if the period *overlaps* the interval. -> broadcast rules don't guarantee that segments of the datasources are always co-located because segments for the colocated datasources are not loaded together atomically. -> If you want to always co-locate the segments of some datasources together, it is recommended to leave colocatedDataSources empty. - ## Permanently deleting data Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java index febf9a4deaf..e278a758230 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java @@ -58,19 +58,17 @@ public class MarkAsUnusedOvershadowedSegments implements CoordinatorDuty for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { - ImmutableDruidServer server = serverHolder.getServer(); - - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { - VersionedIntervalTimeline timeline = timelines - .computeIfAbsent( - dataSource.getName(), - dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder()) - ); - VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator()); - } + addSegmentsFromServer(serverHolder, timelines); } } + for (ServerHolder serverHolder : cluster.getBrokers()) { + addSegmentsFromServer(serverHolder, timelines); + } + + // Note that we do not include segments from ingestion services such as tasks or indexers, + // to prevent unpublished segments from prematurely overshadowing segments. + // Mark all segments as unused in db that are overshadowed by served segments for (DataSegment dataSegment : params.getUsedSegments()) { VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); @@ -83,4 +81,21 @@ public class MarkAsUnusedOvershadowedSegments implements CoordinatorDuty return params.buildFromExisting().withCoordinatorStats(stats).build(); } + + private void addSegmentsFromServer( + ServerHolder serverHolder, + Map> timelines + ) + { + ImmutableDruidServer server = serverHolder.getServer(); + + for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + VersionedIntervalTimeline timeline = timelines + .computeIfAbsent( + dataSource.getName(), + dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder()) + ); + VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator()); + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index 3dc7b4d2f91..d8c207148ff 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.collect.Lists; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; @@ -103,7 +104,21 @@ public class RunRules implements CoordinatorDuty final List segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES); int missingRules = 0; + final Set broadcastDatasources = new HashSet<>(); + for (ImmutableDruidDataSource dataSource : params.getDataSourcesSnapshot().getDataSourcesMap().values()) { + List rules = databaseRuleManager.getRulesWithDefault(dataSource.getName()); + for (Rule rule : rules) { + // A datasource is considered a broadcast datasource if it has any broadcast rules. + // The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules + // executes before BalanceSegments. + if (rule instanceof BroadcastDistributionRule) { + broadcastDatasources.add(dataSource.getName()); + break; + } + } + } + for (DataSegment segment : params.getUsedSegments()) { if (overshadowed.contains(segment.getId())) { // Skipping overshadowed segments @@ -115,12 +130,6 @@ public class RunRules implements CoordinatorDuty if (rule.appliesTo(segment, now)) { stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment)); foundMatchingRule = true; - - // The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules - // executes before BalanceSegments - if (rule instanceof BroadcastDistributionRule) { - broadcastDatasources.add(segment.getDataSource()); - } break; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index bea7a9d1cc7..bd8b2c30d55 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -27,13 +27,18 @@ import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.LoadQueuePeon; import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule; +import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedSet; /** - * Unloads segments that are no longer marked as used from Historical servers. + * Unloads segments that are no longer marked as used from servers. */ public class UnloadUnusedSegments implements CoordinatorDuty { @@ -46,31 +51,102 @@ public class UnloadUnusedSegments implements CoordinatorDuty Set usedSegments = params.getUsedSegments(); DruidCluster cluster = params.getDruidCluster(); + Map broadcastStatusByDatasource = new HashMap<>(); + for (String broadcastDatasource : params.getBroadcastDatasources()) { + broadcastStatusByDatasource.put(broadcastDatasource, true); + } + for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { - ImmutableDruidServer server = serverHolder.getServer(); - - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { - for (DataSegment segment : dataSource.getSegments()) { - if (!usedSegments.contains(segment)) { - LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); - - if (!queuePeon.getSegmentsToDrop().contains(segment)) { - queuePeon.dropSegment(segment, () -> {}); - stats.addToTieredStat("unneededCount", server.getTier(), 1); - log.info( - "Dropping uneeded segment [%s] from server [%s] in tier [%s]", - segment.getId(), - server.getName(), - server.getTier() - ); - } - } - } - } + handleUnusedSegmentsForServer( + serverHolder, + usedSegments, + params, + stats, + false, + broadcastStatusByDatasource + ); } } + for (ServerHolder serverHolder : cluster.getBrokers()) { + handleUnusedSegmentsForServer( + serverHolder, + usedSegments, + params, + stats, + false, + broadcastStatusByDatasource + ); + } + + for (ServerHolder serverHolder : cluster.getRealtimes()) { + handleUnusedSegmentsForServer( + serverHolder, + usedSegments, + params, + stats, + true, + broadcastStatusByDatasource + ); + } + return params.buildFromExisting().withCoordinatorStats(stats).build(); } + + private void handleUnusedSegmentsForServer( + ServerHolder serverHolder, + Set usedSegments, + DruidCoordinatorRuntimeParams params, + CoordinatorStats stats, + boolean dropBroadcastOnly, + Map broadcastStatusByDatasource + ) + { + ImmutableDruidServer server = serverHolder.getServer(); + for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + boolean isBroadcastDatasource = broadcastStatusByDatasource.computeIfAbsent( + dataSource.getName(), + (dataSourceName) -> { + List rules = params.getDatabaseRuleManager().getRulesWithDefault(dataSource.getName()); + for (Rule rule : rules) { + // A datasource is considered a broadcast datasource if it has any broadcast rules. + if (rule instanceof BroadcastDistributionRule) { + return true; + } + } + return false; + } + ); + + // The coordinator tracks used segments by examining the metadata store. + // For tasks, the segments they create are unpublished, so those segments will get dropped + // unless we exclude them here. We currently drop only broadcast segments in that case. + // This check relies on the assumption that queryable stream tasks will never + // ingest data to a broadcast datasource. If a broadcast datasource is switched to become a non-broadcast + // datasource, this will result in the those segments not being dropped from tasks. + // A more robust solution which requires a larger rework could be to expose + // the set of segments that were created by a task/indexer here, and exclude them. + if (dropBroadcastOnly && !isBroadcastDatasource) { + continue; + } + + for (DataSegment segment : dataSource.getSegments()) { + if (!usedSegments.contains(segment)) { + LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); + + if (!queuePeon.getSegmentsToDrop().contains(segment)) { + queuePeon.dropSegment(segment, () -> {}); + stats.addToTieredStat("unneededCount", server.getTier(), 1); + log.info( + "Dropping uneeded segment [%s] from server [%s] in tier [%s]", + segment.getId(), + server.getName(), + server.getTier() + ); + } + } + } + } + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java index 73b95ee960c..301b3bd8f9f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java @@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.DateTimes; @@ -39,9 +41,11 @@ import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Test; +import org.junit.runner.RunWith; import java.util.List; +@RunWith(JUnitParamsRunner.class) public class MarkAsUnusedOvershadowedSegmentsTest { MarkAsUnusedOvershadowedSegments markAsUnusedOvershadowedSegments; @@ -69,8 +73,16 @@ public class MarkAsUnusedOvershadowedSegmentsTest .build(); @Test - public void testRun() + @Parameters( + { + "historical", + "broker" + } + ) + public void testRun(String serverTypeString) { + ServerType serverType = ServerType.fromString(serverTypeString); + markAsUnusedOvershadowedSegments = new MarkAsUnusedOvershadowedSegments(coordinator); usedSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2); @@ -95,7 +107,7 @@ public class MarkAsUnusedOvershadowedSegmentsTest .andReturn("") .anyTimes(); EasyMock.expect(druidServer.getType()) - .andReturn(ServerType.HISTORICAL) + .andReturn(serverType) .anyTimes(); EasyMock.expect(druidServer.getDataSources()) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java new file mode 100644 index 00000000000..f74762c7ad9 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.ImmutableDruidServerTests; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.metadata.MetadataRuleManager; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers; +import org.apache.druid.server.coordinator.CoordinatorStats; +import org.apache.druid.server.coordinator.DruidClusterBuilder; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; +import org.apache.druid.server.coordinator.rules.ForeverLoadRule; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +public class UnloadUnusedSegmentsTest +{ + private DruidCoordinator coordinator; + private ImmutableDruidServer historicalServer; + private ImmutableDruidServer historicalServerTier2; + private ImmutableDruidServer brokerServer; + private ImmutableDruidServer indexerServer; + private LoadQueuePeonTester historicalPeon; + private LoadQueuePeonTester historicalTier2Peon; + private LoadQueuePeonTester brokerPeon; + private LoadQueuePeonTester indexerPeon; + private DataSegment segment1; + private DataSegment segment2; + private DataSegment broadcastSegment; + private DataSegment realtimeOnlySegment; + private List segments; + private List segmentsForRealtime; + private ImmutableDruidDataSource dataSource1; + private ImmutableDruidDataSource dataSource2; + private ImmutableDruidDataSource dataSource2ForRealtime; + private ImmutableDruidDataSource broadcastDatasource; + private List dataSources; + private List dataSourcesForRealtime; + private Set broadcastDatasourceNames; + private MetadataRuleManager databaseRuleManager; + + @Before + public void setUp() + { + coordinator = EasyMock.createMock(DruidCoordinator.class); + historicalServer = EasyMock.createMock(ImmutableDruidServer.class); + historicalServerTier2 = EasyMock.createMock(ImmutableDruidServer.class); + brokerServer = EasyMock.createMock(ImmutableDruidServer.class); + indexerServer = EasyMock.createMock(ImmutableDruidServer.class); + segment1 = EasyMock.createMock(DataSegment.class); + segment2 = EasyMock.createMock(DataSegment.class); + databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); + + DateTime start1 = DateTimes.of("2012-01-01"); + DateTime start2 = DateTimes.of("2012-02-01"); + DateTime version = DateTimes.of("2012-05-01"); + segment1 = new DataSegment( + "datasource1", + new Interval(start1, start1.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 11L + ); + segment2 = new DataSegment( + "datasource2", + new Interval(start1, start1.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 7L + ); + realtimeOnlySegment = new DataSegment( + "datasource2", + new Interval(start2, start2.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 7L + ); + broadcastSegment = new DataSegment( + "broadcastDatasource", + new Interval(start1, start1.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 7L + ); + + segments = new ArrayList<>(); + segments.add(segment1); + segments.add(segment2); + segments.add(broadcastSegment); + + segmentsForRealtime = new ArrayList<>(); + segmentsForRealtime.add(realtimeOnlySegment); + segmentsForRealtime.add(broadcastSegment); + + historicalPeon = new LoadQueuePeonTester(); + historicalTier2Peon = new LoadQueuePeonTester(); + brokerPeon = new LoadQueuePeonTester(); + indexerPeon = new LoadQueuePeonTester(); + + dataSource1 = new ImmutableDruidDataSource( + "datasource1", + Collections.emptyMap(), + Collections.singleton(segment1) + ); + dataSource2 = new ImmutableDruidDataSource( + "datasource2", + Collections.emptyMap(), + Collections.singleton(segment2) + ); + + broadcastDatasourceNames = Collections.singleton("broadcastDatasource"); + broadcastDatasource = new ImmutableDruidDataSource( + "broadcastDatasource", + Collections.emptyMap(), + Collections.singleton(broadcastSegment) + ); + + dataSources = ImmutableList.of(dataSource1, dataSource2, broadcastDatasource); + + // This simulates a task that is ingesting to an existing non-broadcast datasource, with unpublished segments, + // while also having a broadcast segment loaded. + dataSource2ForRealtime = new ImmutableDruidDataSource( + "datasource2", + Collections.emptyMap(), + Collections.singleton(realtimeOnlySegment) + ); + dataSourcesForRealtime = ImmutableList.of(dataSource2ForRealtime, broadcastDatasource); + } + + @After + public void tearDown() + { + EasyMock.verify(coordinator); + EasyMock.verify(historicalServer); + EasyMock.verify(historicalServerTier2); + EasyMock.verify(brokerServer); + EasyMock.verify(indexerServer); + EasyMock.verify(databaseRuleManager); + } + + @Test + public void test_unloadUnusedSegmentsFromAllServers() + { + mockDruidServer( + historicalServer, + ServerType.HISTORICAL, + "historical", + DruidServer.DEFAULT_TIER, + 30L, + 100L, + segments, + dataSources + ); + mockDruidServer( + historicalServerTier2, + ServerType.HISTORICAL, + "historicalTier2", + "tier2", + 30L, + 100L, + segments, + dataSources + ); + mockDruidServer( + brokerServer, + ServerType.BROKER, + "broker", + DruidServer.DEFAULT_TIER, + 30L, + 100L, + segments, + dataSources + ); + mockDruidServer( + indexerServer, + ServerType.INDEXER_EXECUTOR, + "indexer", + DruidServer.DEFAULT_TIER, + 30L, + 100L, + segmentsForRealtime, + dataSourcesForRealtime + ); + + // Mock stuff that the coordinator needs + mockCoordinator(coordinator); + + mockRuleManager(databaseRuleManager); + + // We keep datasource2 segments only, drop datasource1 and broadcastDatasource from all servers + // realtimeSegment is intentionally missing from the set, to match how a realtime tasks's unpublished segments + // will not appear in the coordinator's view of used segments. + Set usedSegments = ImmutableSet.of(segment2); + + DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers + .newBuilder() + .withDruidCluster( + DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder(historicalServer, historicalPeon, false) + ) + .addTier( + "tier2", + new ServerHolder(historicalServerTier2, historicalTier2Peon, false) + ) + .withBrokers( + new ServerHolder(brokerServer, brokerPeon, false) + ) + .withRealtimes( + new ServerHolder(indexerServer, indexerPeon, false) + ) + .build() + ) + .withLoadManagementPeons( + ImmutableMap.of( + "historical", historicalPeon, + "historicalTier2", historicalTier2Peon, + "broker", brokerPeon, + "indexer", indexerPeon + ) + ) + .withUsedSegmentsInTest(usedSegments) + .withBroadcastDatasources(broadcastDatasourceNames) + .withDatabaseRuleManager(databaseRuleManager) + .build(); + + params = new UnloadUnusedSegments().run(params); + CoordinatorStats stats = params.getCoordinatorStats(); + + // We drop segment1 and broadcast1 from all servers, realtimeSegment is not dropped by the indexer + Assert.assertEquals(5, stats.getTieredStat("unneededCount", DruidServer.DEFAULT_TIER)); + Assert.assertEquals(2, stats.getTieredStat("unneededCount", "tier2")); + } + + private static void mockDruidServer( + ImmutableDruidServer druidServer, + ServerType serverType, + String name, + String tier, + long currentSize, + long maxSize, + List segments, + List dataSources + ) + { + EasyMock.expect(druidServer.getName()).andReturn(name).anyTimes(); + EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes(); + EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).anyTimes(); + EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).anyTimes(); + ImmutableDruidServerTests.expectSegments(druidServer, segments); + EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes(); + EasyMock.expect(druidServer.getType()).andReturn(serverType).anyTimes(); + EasyMock.expect(druidServer.getDataSources()).andReturn(dataSources).anyTimes(); + if (!segments.isEmpty()) { + segments.forEach( + s -> EasyMock.expect(druidServer.getSegment(s.getId())).andReturn(s).anyTimes() + ); + } + EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer); + } + + private static void mockCoordinator(DruidCoordinator coordinator) + { + coordinator.moveSegment( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(coordinator); + } + + private static void mockRuleManager(MetadataRuleManager metadataRuleManager) + { + EasyMock.expect(metadataRuleManager.getRulesWithDefault("datasource1")).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of( + DruidServer.DEFAULT_TIER, 1, + "tier2", 1 + ) + ) + )).anyTimes(); + + EasyMock.expect(metadataRuleManager.getRulesWithDefault("datasource2")).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of( + DruidServer.DEFAULT_TIER, 1, + "tier2", 1 + ) + ) + )).anyTimes(); + + EasyMock.expect(metadataRuleManager.getRulesWithDefault("broadcastDatasource")).andReturn( + Collections.singletonList( + new ForeverBroadcastDistributionRule() + )).anyTimes(); + + EasyMock.replay(metadataRuleManager); + } +}