Fix broadcast rule drop and docs (#10019)

* Fix broadcast rule drop and docs

* Remove racy test check

* Don't drop non-broadcast segments on tasks, add overshadowing handling

* Don't use realtimes for overshadowing

* Fix dropping for ingestion services
This commit is contained in:
Jonathan Wei 2020-06-12 02:33:28 -07:00 committed by GitHub
parent 67669b4ad4
commit fe2f656427
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 521 additions and 51 deletions

View File

@ -170,8 +170,9 @@ The interval of a segment will be compared against the specified period. The per
## Broadcast Rules ## Broadcast Rules
Broadcast rules indicate how segments of different datasources should be co-located in Historical processes. Broadcast rules indicate that segments of a data source should be loaded by all servers of a cluster of the following types: historicals, brokers, tasks, and indexers.
Once a broadcast rule is configured for a datasource, all segments of the datasource are broadcasted to the servers holding _any segments_ of the co-located datasources.
Note that the broadcast segments are only directly queryable through the historicals, but they are currently loaded on other server types to support join queries.
### Forever Broadcast Rule ### Forever Broadcast Rule
@ -179,13 +180,13 @@ Forever broadcast rules are of the form:
```json ```json
{ {
"type" : "broadcastForever", "type" : "broadcastForever"
"colocatedDataSources" : [ "target_source1", "target_source2" ]
} }
``` ```
* `type` - this should always be "broadcastForever" * `type` - this should always be "broadcastForever"
* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster.
This rule applies to all segments of a datasource, covering all intervals.
### Interval Broadcast Rule ### Interval Broadcast Rule
@ -194,13 +195,11 @@ Interval broadcast rules are of the form:
```json ```json
{ {
"type" : "broadcastByInterval", "type" : "broadcastByInterval",
"colocatedDataSources" : [ "target_source1", "target_source2" ],
"interval" : "2012-01-01/2013-01-01" "interval" : "2012-01-01/2013-01-01"
} }
``` ```
* `type` - this should always be "broadcastByInterval" * `type` - this should always be "broadcastByInterval"
* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster.
* `interval` - A JSON Object representing ISO-8601 Periods. Only the segments of the interval will be broadcasted. * `interval` - A JSON Object representing ISO-8601 Periods. Only the segments of the interval will be broadcasted.
### Period Broadcast Rule ### Period Broadcast Rule
@ -210,22 +209,17 @@ Period broadcast rules are of the form:
```json ```json
{ {
"type" : "broadcastByPeriod", "type" : "broadcastByPeriod",
"colocatedDataSources" : [ "target_source1", "target_source2" ],
"period" : "P1M", "period" : "P1M",
"includeFuture" : true "includeFuture" : true
} }
``` ```
* `type` - this should always be "broadcastByPeriod" * `type` - this should always be "broadcastByPeriod"
* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster.
* `period` - A JSON Object representing ISO-8601 Periods * `period` - A JSON Object representing ISO-8601 Periods
* `includeFuture` - A JSON Boolean indicating whether the load period should include the future. This property is optional, Default is true. * `includeFuture` - A JSON Boolean indicating whether the load period should include the future. This property is optional, Default is true.
The interval of a segment will be compared against the specified period. The period is from some time in the past to the future or to the current time, which depends on `includeFuture` is true or false. The rule matches if the period *overlaps* the interval. The interval of a segment will be compared against the specified period. The period is from some time in the past to the future or to the current time, which depends on `includeFuture` is true or false. The rule matches if the period *overlaps* the interval.
> broadcast rules don't guarantee that segments of the datasources are always co-located because segments for the colocated datasources are not loaded together atomically.
> If you want to always co-locate the segments of some datasources together, it is recommended to leave colocatedDataSources empty.
## Permanently deleting data ## Permanently deleting data
Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any

View File

@ -58,19 +58,17 @@ public class MarkAsUnusedOvershadowedSegments implements CoordinatorDuty
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) { for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) { for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer(); addSegmentsFromServer(serverHolder, timelines);
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines
.computeIfAbsent(
dataSource.getName(),
dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder())
);
VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator());
}
} }
} }
for (ServerHolder serverHolder : cluster.getBrokers()) {
addSegmentsFromServer(serverHolder, timelines);
}
// Note that we do not include segments from ingestion services such as tasks or indexers,
// to prevent unpublished segments from prematurely overshadowing segments.
// Mark all segments as unused in db that are overshadowed by served segments // Mark all segments as unused in db that are overshadowed by served segments
for (DataSegment dataSegment : params.getUsedSegments()) { for (DataSegment dataSegment : params.getUsedSegments()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource()); VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
@ -83,4 +81,21 @@ public class MarkAsUnusedOvershadowedSegments implements CoordinatorDuty
return params.buildFromExisting().withCoordinatorStats(stats).build(); return params.buildFromExisting().withCoordinatorStats(stats).build();
} }
private void addSegmentsFromServer(
ServerHolder serverHolder,
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines
)
{
ImmutableDruidServer server = serverHolder.getServer();
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines
.computeIfAbsent(
dataSource.getName(),
dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder())
);
VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator());
}
}
} }

View File

@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.duty; package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.MetadataRuleManager;
@ -103,7 +104,21 @@ public class RunRules implements CoordinatorDuty
final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES); final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
int missingRules = 0; int missingRules = 0;
final Set<String> broadcastDatasources = new HashSet<>(); final Set<String> broadcastDatasources = new HashSet<>();
for (ImmutableDruidDataSource dataSource : params.getDataSourcesSnapshot().getDataSourcesMap().values()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(dataSource.getName());
for (Rule rule : rules) {
// A datasource is considered a broadcast datasource if it has any broadcast rules.
// The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules
// executes before BalanceSegments.
if (rule instanceof BroadcastDistributionRule) {
broadcastDatasources.add(dataSource.getName());
break;
}
}
}
for (DataSegment segment : params.getUsedSegments()) { for (DataSegment segment : params.getUsedSegments()) {
if (overshadowed.contains(segment.getId())) { if (overshadowed.contains(segment.getId())) {
// Skipping overshadowed segments // Skipping overshadowed segments
@ -115,12 +130,6 @@ public class RunRules implements CoordinatorDuty
if (rule.appliesTo(segment, now)) { if (rule.appliesTo(segment, now)) {
stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment)); stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment));
foundMatchingRule = true; foundMatchingRule = true;
// The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules
// executes before BalanceSegments
if (rule instanceof BroadcastDistributionRule) {
broadcastDatasources.add(segment.getDataSource());
}
break; break;
} }
} }

View File

@ -27,13 +27,18 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadQueuePeon; import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
/** /**
* Unloads segments that are no longer marked as used from Historical servers. * Unloads segments that are no longer marked as used from servers.
*/ */
public class UnloadUnusedSegments implements CoordinatorDuty public class UnloadUnusedSegments implements CoordinatorDuty
{ {
@ -46,31 +51,102 @@ public class UnloadUnusedSegments implements CoordinatorDuty
Set<DataSegment> usedSegments = params.getUsedSegments(); Set<DataSegment> usedSegments = params.getUsedSegments();
DruidCluster cluster = params.getDruidCluster(); DruidCluster cluster = params.getDruidCluster();
Map<String, Boolean> broadcastStatusByDatasource = new HashMap<>();
for (String broadcastDatasource : params.getBroadcastDatasources()) {
broadcastStatusByDatasource.put(broadcastDatasource, true);
}
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) { for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) { for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer(); handleUnusedSegmentsForServer(
serverHolder,
for (ImmutableDruidDataSource dataSource : server.getDataSources()) { usedSegments,
for (DataSegment segment : dataSource.getSegments()) { params,
if (!usedSegments.contains(segment)) { stats,
LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); false,
broadcastStatusByDatasource
if (!queuePeon.getSegmentsToDrop().contains(segment)) { );
queuePeon.dropSegment(segment, () -> {});
stats.addToTieredStat("unneededCount", server.getTier(), 1);
log.info(
"Dropping uneeded segment [%s] from server [%s] in tier [%s]",
segment.getId(),
server.getName(),
server.getTier()
);
}
}
}
}
} }
} }
for (ServerHolder serverHolder : cluster.getBrokers()) {
handleUnusedSegmentsForServer(
serverHolder,
usedSegments,
params,
stats,
false,
broadcastStatusByDatasource
);
}
for (ServerHolder serverHolder : cluster.getRealtimes()) {
handleUnusedSegmentsForServer(
serverHolder,
usedSegments,
params,
stats,
true,
broadcastStatusByDatasource
);
}
return params.buildFromExisting().withCoordinatorStats(stats).build(); return params.buildFromExisting().withCoordinatorStats(stats).build();
} }
private void handleUnusedSegmentsForServer(
ServerHolder serverHolder,
Set<DataSegment> usedSegments,
DruidCoordinatorRuntimeParams params,
CoordinatorStats stats,
boolean dropBroadcastOnly,
Map<String, Boolean> broadcastStatusByDatasource
)
{
ImmutableDruidServer server = serverHolder.getServer();
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
boolean isBroadcastDatasource = broadcastStatusByDatasource.computeIfAbsent(
dataSource.getName(),
(dataSourceName) -> {
List<Rule> rules = params.getDatabaseRuleManager().getRulesWithDefault(dataSource.getName());
for (Rule rule : rules) {
// A datasource is considered a broadcast datasource if it has any broadcast rules.
if (rule instanceof BroadcastDistributionRule) {
return true;
}
}
return false;
}
);
// The coordinator tracks used segments by examining the metadata store.
// For tasks, the segments they create are unpublished, so those segments will get dropped
// unless we exclude them here. We currently drop only broadcast segments in that case.
// This check relies on the assumption that queryable stream tasks will never
// ingest data to a broadcast datasource. If a broadcast datasource is switched to become a non-broadcast
// datasource, this will result in the those segments not being dropped from tasks.
// A more robust solution which requires a larger rework could be to expose
// the set of segments that were created by a task/indexer here, and exclude them.
if (dropBroadcastOnly && !isBroadcastDatasource) {
continue;
}
for (DataSegment segment : dataSource.getSegments()) {
if (!usedSegments.contains(segment)) {
LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
if (!queuePeon.getSegmentsToDrop().contains(segment)) {
queuePeon.dropSegment(segment, () -> {});
stats.addToTieredStat("unneededCount", server.getTier(), 1);
log.info(
"Dropping uneeded segment [%s] from server [%s] in tier [%s]",
segment.getId(),
server.getName(),
server.getTier()
);
}
}
}
}
}
} }

View File

@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -39,9 +41,11 @@ import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.List; import java.util.List;
@RunWith(JUnitParamsRunner.class)
public class MarkAsUnusedOvershadowedSegmentsTest public class MarkAsUnusedOvershadowedSegmentsTest
{ {
MarkAsUnusedOvershadowedSegments markAsUnusedOvershadowedSegments; MarkAsUnusedOvershadowedSegments markAsUnusedOvershadowedSegments;
@ -69,8 +73,16 @@ public class MarkAsUnusedOvershadowedSegmentsTest
.build(); .build();
@Test @Test
public void testRun() @Parameters(
{
"historical",
"broker"
}
)
public void testRun(String serverTypeString)
{ {
ServerType serverType = ServerType.fromString(serverTypeString);
markAsUnusedOvershadowedSegments = markAsUnusedOvershadowedSegments =
new MarkAsUnusedOvershadowedSegments(coordinator); new MarkAsUnusedOvershadowedSegments(coordinator);
usedSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2); usedSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2);
@ -95,7 +107,7 @@ public class MarkAsUnusedOvershadowedSegmentsTest
.andReturn("") .andReturn("")
.anyTimes(); .anyTimes();
EasyMock.expect(druidServer.getType()) EasyMock.expect(druidServer.getType())
.andReturn(ServerType.HISTORICAL) .andReturn(serverType)
.anyTimes(); .anyTimes();
EasyMock.expect(druidServer.getDataSources()) EasyMock.expect(druidServer.getDataSources())

View File

@ -0,0 +1,364 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ImmutableDruidServerTests;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidClusterBuilder;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
public class UnloadUnusedSegmentsTest
{
private DruidCoordinator coordinator;
private ImmutableDruidServer historicalServer;
private ImmutableDruidServer historicalServerTier2;
private ImmutableDruidServer brokerServer;
private ImmutableDruidServer indexerServer;
private LoadQueuePeonTester historicalPeon;
private LoadQueuePeonTester historicalTier2Peon;
private LoadQueuePeonTester brokerPeon;
private LoadQueuePeonTester indexerPeon;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment broadcastSegment;
private DataSegment realtimeOnlySegment;
private List<DataSegment> segments;
private List<DataSegment> segmentsForRealtime;
private ImmutableDruidDataSource dataSource1;
private ImmutableDruidDataSource dataSource2;
private ImmutableDruidDataSource dataSource2ForRealtime;
private ImmutableDruidDataSource broadcastDatasource;
private List<ImmutableDruidDataSource> dataSources;
private List<ImmutableDruidDataSource> dataSourcesForRealtime;
private Set<String> broadcastDatasourceNames;
private MetadataRuleManager databaseRuleManager;
@Before
public void setUp()
{
coordinator = EasyMock.createMock(DruidCoordinator.class);
historicalServer = EasyMock.createMock(ImmutableDruidServer.class);
historicalServerTier2 = EasyMock.createMock(ImmutableDruidServer.class);
brokerServer = EasyMock.createMock(ImmutableDruidServer.class);
indexerServer = EasyMock.createMock(ImmutableDruidServer.class);
segment1 = EasyMock.createMock(DataSegment.class);
segment2 = EasyMock.createMock(DataSegment.class);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
DateTime start1 = DateTimes.of("2012-01-01");
DateTime start2 = DateTimes.of("2012-02-01");
DateTime version = DateTimes.of("2012-05-01");
segment1 = new DataSegment(
"datasource1",
new Interval(start1, start1.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
11L
);
segment2 = new DataSegment(
"datasource2",
new Interval(start1, start1.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
7L
);
realtimeOnlySegment = new DataSegment(
"datasource2",
new Interval(start2, start2.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
7L
);
broadcastSegment = new DataSegment(
"broadcastDatasource",
new Interval(start1, start1.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
7L
);
segments = new ArrayList<>();
segments.add(segment1);
segments.add(segment2);
segments.add(broadcastSegment);
segmentsForRealtime = new ArrayList<>();
segmentsForRealtime.add(realtimeOnlySegment);
segmentsForRealtime.add(broadcastSegment);
historicalPeon = new LoadQueuePeonTester();
historicalTier2Peon = new LoadQueuePeonTester();
brokerPeon = new LoadQueuePeonTester();
indexerPeon = new LoadQueuePeonTester();
dataSource1 = new ImmutableDruidDataSource(
"datasource1",
Collections.emptyMap(),
Collections.singleton(segment1)
);
dataSource2 = new ImmutableDruidDataSource(
"datasource2",
Collections.emptyMap(),
Collections.singleton(segment2)
);
broadcastDatasourceNames = Collections.singleton("broadcastDatasource");
broadcastDatasource = new ImmutableDruidDataSource(
"broadcastDatasource",
Collections.emptyMap(),
Collections.singleton(broadcastSegment)
);
dataSources = ImmutableList.of(dataSource1, dataSource2, broadcastDatasource);
// This simulates a task that is ingesting to an existing non-broadcast datasource, with unpublished segments,
// while also having a broadcast segment loaded.
dataSource2ForRealtime = new ImmutableDruidDataSource(
"datasource2",
Collections.emptyMap(),
Collections.singleton(realtimeOnlySegment)
);
dataSourcesForRealtime = ImmutableList.of(dataSource2ForRealtime, broadcastDatasource);
}
@After
public void tearDown()
{
EasyMock.verify(coordinator);
EasyMock.verify(historicalServer);
EasyMock.verify(historicalServerTier2);
EasyMock.verify(brokerServer);
EasyMock.verify(indexerServer);
EasyMock.verify(databaseRuleManager);
}
@Test
public void test_unloadUnusedSegmentsFromAllServers()
{
mockDruidServer(
historicalServer,
ServerType.HISTORICAL,
"historical",
DruidServer.DEFAULT_TIER,
30L,
100L,
segments,
dataSources
);
mockDruidServer(
historicalServerTier2,
ServerType.HISTORICAL,
"historicalTier2",
"tier2",
30L,
100L,
segments,
dataSources
);
mockDruidServer(
brokerServer,
ServerType.BROKER,
"broker",
DruidServer.DEFAULT_TIER,
30L,
100L,
segments,
dataSources
);
mockDruidServer(
indexerServer,
ServerType.INDEXER_EXECUTOR,
"indexer",
DruidServer.DEFAULT_TIER,
30L,
100L,
segmentsForRealtime,
dataSourcesForRealtime
);
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
mockRuleManager(databaseRuleManager);
// We keep datasource2 segments only, drop datasource1 and broadcastDatasource from all servers
// realtimeSegment is intentionally missing from the set, to match how a realtime tasks's unpublished segments
// will not appear in the coordinator's view of used segments.
Set<DataSegment> usedSegments = ImmutableSet.of(segment2);
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(
DruidClusterBuilder
.newBuilder()
.addTier(
DruidServer.DEFAULT_TIER,
new ServerHolder(historicalServer, historicalPeon, false)
)
.addTier(
"tier2",
new ServerHolder(historicalServerTier2, historicalTier2Peon, false)
)
.withBrokers(
new ServerHolder(brokerServer, brokerPeon, false)
)
.withRealtimes(
new ServerHolder(indexerServer, indexerPeon, false)
)
.build()
)
.withLoadManagementPeons(
ImmutableMap.of(
"historical", historicalPeon,
"historicalTier2", historicalTier2Peon,
"broker", brokerPeon,
"indexer", indexerPeon
)
)
.withUsedSegmentsInTest(usedSegments)
.withBroadcastDatasources(broadcastDatasourceNames)
.withDatabaseRuleManager(databaseRuleManager)
.build();
params = new UnloadUnusedSegments().run(params);
CoordinatorStats stats = params.getCoordinatorStats();
// We drop segment1 and broadcast1 from all servers, realtimeSegment is not dropped by the indexer
Assert.assertEquals(5, stats.getTieredStat("unneededCount", DruidServer.DEFAULT_TIER));
Assert.assertEquals(2, stats.getTieredStat("unneededCount", "tier2"));
}
private static void mockDruidServer(
ImmutableDruidServer druidServer,
ServerType serverType,
String name,
String tier,
long currentSize,
long maxSize,
List<DataSegment> segments,
List<ImmutableDruidDataSource> dataSources
)
{
EasyMock.expect(druidServer.getName()).andReturn(name).anyTimes();
EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).anyTimes();
EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).anyTimes();
ImmutableDruidServerTests.expectSegments(druidServer, segments);
EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes();
EasyMock.expect(druidServer.getType()).andReturn(serverType).anyTimes();
EasyMock.expect(druidServer.getDataSources()).andReturn(dataSources).anyTimes();
if (!segments.isEmpty()) {
segments.forEach(
s -> EasyMock.expect(druidServer.getSegment(s.getId())).andReturn(s).anyTimes()
);
}
EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer);
}
private static void mockCoordinator(DruidCoordinator coordinator)
{
coordinator.moveSegment(
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
}
private static void mockRuleManager(MetadataRuleManager metadataRuleManager)
{
EasyMock.expect(metadataRuleManager.getRulesWithDefault("datasource1")).andReturn(
Collections.singletonList(
new ForeverLoadRule(
ImmutableMap.of(
DruidServer.DEFAULT_TIER, 1,
"tier2", 1
)
)
)).anyTimes();
EasyMock.expect(metadataRuleManager.getRulesWithDefault("datasource2")).andReturn(
Collections.singletonList(
new ForeverLoadRule(
ImmutableMap.of(
DruidServer.DEFAULT_TIER, 1,
"tier2", 1
)
)
)).anyTimes();
EasyMock.expect(metadataRuleManager.getRulesWithDefault("broadcastDatasource")).andReturn(
Collections.singletonList(
new ForeverBroadcastDistributionRule()
)).anyTimes();
EasyMock.replay(metadataRuleManager);
}
}