Add test and metrics for KillStalePendingSegments duty (#14951)

Changes:
- Add new metric `kill/pendingSegments/count` with dimension `dataSource`
- Add tests for `KillStalePendingSegments`
- Reduce no-op logs that spit out for each datasource even when no pending
segments have been deleted. This can get particularly noisy at low values of `indexingPeriod`.
- Refactor the code in `KillStalePendingSegments` for readability and add javadocs
This commit is contained in:
Kashif Faraz 2023-09-08 10:33:47 +05:30 committed by GitHub
parent f9cf500a69
commit 647686aee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 475 additions and 115 deletions

View File

@ -334,6 +334,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`killTask/availableSlot/count`| Number of available task slots that can be used for auto kill tasks in the auto kill run. This is the max number of task slots minus any currently running auto kill tasks. | |Varies|
|`killTask/maxSlot/count`| Maximum number of task slots available for auto kill tasks in the auto kill run. | |Varies|
|`kill/task/count`| Number of tasks issued in the auto kill run. | |Varies|
|`kill/pendingSegments/count`|Number of stale pending segments deleted from the metadata store.|`dataSource`|Varies|
|`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
|`segment/waitCompact/count`|Total number of segments of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
|`interval/waitCompact/count`|Total number of intervals of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|

View File

@ -20,7 +20,9 @@
package org.apache.druid.java.util.common;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import io.netty.util.SuppressForbidden;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -30,6 +32,7 @@ import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Pattern;
@ -194,6 +197,34 @@ public final class DateTimes
&& ISOChronology.getInstanceUTC().equals(dateTime.getChronology());
}
/**
* Returns the earlier of the two given dates. When passed a null and a non-null
* date, this method simply returns the non-null value.
*/
public static DateTime earlierOf(DateTime a, DateTime b)
{
// Put nulls last to select the smaller non-null value
if (Objects.compare(a, b, Ordering.natural().nullsLast()) < 0) {
return a;
} else {
return b;
}
}
/**
* Returns the later of the two given dates. When passed a null and a non-null
* date, this method simply returns the non-null value.
*/
public static DateTime laterOf(DateTime a, DateTime b)
{
// Put nulls first to select the bigger non-null value
if (Objects.compare(a, b, Comparators.naturalNullsFirst()) > 0) {
return a;
} else {
return b;
}
}
private DateTimes()
{
}

View File

@ -123,4 +123,34 @@ public class DateTimesTest
DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles")))
);
}
@Test
public void testEarlierOf()
{
Assert.assertNull(DateTimes.earlierOf(null, null));
final DateTime jan14 = DateTimes.of("2013-01-14");
Assert.assertEquals(jan14, DateTimes.earlierOf(null, jan14));
Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, null));
Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan14));
final DateTime jan15 = DateTimes.of("2013-01-15");
Assert.assertEquals(jan14, DateTimes.earlierOf(jan15, jan14));
Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan15));
}
@Test
public void testLaterOf()
{
Assert.assertNull(DateTimes.laterOf(null, null));
final DateTime jan14 = DateTimes.of("2013-01-14");
Assert.assertEquals(jan14, DateTimes.laterOf(null, jan14));
Assert.assertEquals(jan14, DateTimes.laterOf(jan14, null));
Assert.assertEquals(jan14, DateTimes.laterOf(jan14, jan14));
final DateTime jan15 = DateTimes.of("2013-01-15");
Assert.assertEquals(jan15, DateTimes.laterOf(jan15, jan14));
Assert.assertEquals(jan15, DateTimes.laterOf(jan14, jan15));
}
}

View File

@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.utils.JvmUtils;
@ -77,19 +76,13 @@ public class CoordinatorDynamicConfig
private final Map<Dimension, String> validDebugDimensions;
/**
* Stale pending segments belonging to the data sources in this list are not killed by {@link
* Stale pending segments belonging to the data sources in this list are not killed by {@code
* KillStalePendingSegments}. In other words, segments in these data sources are "protected".
* <p>
* Pending segments are considered "stale" when their created_time is older than {@link
* KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
*/
private final Set<String> dataSourcesToNotKillStalePendingSegmentsIn;
/**
* The maximum number of segments that can be queued for loading to any given server.
*
* @see LoadQueuePeon
* @see org.apache.druid.server.coordinator.rules.LoadRule#run
*/
private final int maxSegmentsInNodeLoadingQueue;
private final boolean pauseCoordination;
@ -576,6 +569,12 @@ public class CoordinatorDynamicConfig
return this;
}
public Builder withDatasourcesToNotKillPendingSegmentsIn(Set<String> datasources)
{
this.dataSourcesToNotKillStalePendingSegmentsIn = datasources;
return this;
}
public Builder withKillTaskSlotRatio(Double killTaskSlotRatio)
{
this.killTaskSlotRatio = killTaskSlotRatio;

View File

@ -1,106 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.ArrayList;
import java.util.List;
public class KillStalePendingSegments implements CoordinatorDuty
{
private static final Logger log = new Logger(KillStalePendingSegments.class);
private static final Period KEEP_PENDING_SEGMENTS_OFFSET = new Period("P1D");
private final OverlordClient overlordClient;
@Inject
public KillStalePendingSegments(OverlordClient overlordClient)
{
this.overlordClient = overlordClient;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final List<DateTime> createdTimes = new ArrayList<>();
// Include one complete status so we can get the time of the last-created complete task. (The Overlord API returns
// complete tasks in descending order of created_date.)
final List<TaskStatusPlus> statuses =
ImmutableList.copyOf(FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1), true));
createdTimes.add(
statuses
.stream()
.filter(status -> status.getStatusCode() == null || !status.getStatusCode().isComplete())
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time.
);
final TaskStatusPlus completeTaskStatus =
statuses.stream()
.filter(status -> status != null && status.getStatusCode().isComplete())
.findFirst()
.orElse(null);
if (completeTaskStatus != null) {
createdTimes.add(completeTaskStatus.getCreatedTime());
}
createdTimes.sort(Comparators.naturalNullsFirst());
// There should be at least one createdTime because the current time is added to the 'createdTimes' list if there
// is no running/pending/waiting tasks.
Preconditions.checkState(!createdTimes.isEmpty(), "Failed to gather createdTimes of tasks");
// If there is no running/pending/waiting/complete tasks, stalePendingSegmentsCutoffCreationTime is
// (DateTimes.nowUtc() - KEEP_PENDING_SEGMENTS_OFFSET).
final DateTime stalePendingSegmentsCutoffCreationTime = createdTimes.get(0).minus(KEEP_PENDING_SEGMENTS_OFFSET);
for (String dataSource : params.getUsedSegmentsTimelinesPerDataSource().keySet()) {
if (!params.getCoordinatorDynamicConfig().getDataSourcesToNotKillStalePendingSegmentsIn().contains(dataSource)) {
final int pendingSegmentsKilled = FutureUtils.getUnchecked(
overlordClient.killPendingSegments(
dataSource,
new Interval(DateTimes.MIN, stalePendingSegmentsCutoffCreationTime)
),
true
);
log.info(
"Killed [%d] pendingSegments created until [%s] for dataSource[%s]",
pendingSegmentsKilled,
stalePendingSegmentsCutoffCreationTime,
dataSource
);
}
}
return params;
}
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Duty to kill stale pending segments which are not needed anymore. Pending segments
* are created when appending realtime or batch tasks allocate segments to build
* incremental indexes. Under normal operation, these pending segments get committed
* when the task completes and become regular segments. But in case of task failures,
* some pending segments might be left around and cause clutter in the metadata store.
* <p>
* While cleaning up, this duty ensures that the following pending segments are
* retained for at least {@link #DURATION_TO_RETAIN}:
* <ul>
* <li>Pending segments created by any active task (across all datasources)</li>
* <li>Pending segments created by the latest completed task (across all datasources)</li>
* </ul>
*/
public class KillStalePendingSegments implements CoordinatorDuty
{
private static final Logger log = new Logger(KillStalePendingSegments.class);
private static final Period DURATION_TO_RETAIN = new Period("P1D");
private final OverlordClient overlordClient;
@Inject
public KillStalePendingSegments(OverlordClient overlordClient)
{
this.overlordClient = overlordClient;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final Set<String> killDatasources = new HashSet<>(
params.getUsedSegmentsTimelinesPerDataSource().keySet()
);
killDatasources.removeAll(
params.getCoordinatorDynamicConfig()
.getDataSourcesToNotKillStalePendingSegmentsIn()
);
final DateTime minCreatedTime = getMinCreatedTimeToRetain();
for (String dataSource : killDatasources) {
int pendingSegmentsKilled = FutureUtils.getUnchecked(
overlordClient.killPendingSegments(
dataSource,
new Interval(DateTimes.MIN, minCreatedTime)
),
true
);
if (pendingSegmentsKilled > 0) {
log.info(
"Killed [%d] pendingSegments created before [%s] for datasource[%s].",
pendingSegmentsKilled, minCreatedTime, dataSource
);
params.getCoordinatorStats().add(
Stats.Kill.PENDING_SEGMENTS,
RowKey.of(Dimension.DATASOURCE, dataSource),
pendingSegmentsKilled
);
}
}
return params;
}
/**
* Computes the minimum created time of retainable pending segments. Any pending
* segment created before this time is considered stale and can be safely deleted.
* The limit is determined to ensure that pending segments created by any active
* task and the latest completed task (across all datasources) are retained for
* at least {@link #DURATION_TO_RETAIN}.
*/
private DateTime getMinCreatedTimeToRetain()
{
// Fetch the statuses of all active tasks and the latest completed task
// (The Overlord API returns complete tasks in descending order of created_date.)
final List<TaskStatusPlus> statuses = ImmutableList.copyOf(
FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1), true)
);
DateTime earliestActiveTaskStart = DateTimes.nowUtc();
DateTime latestCompletedTaskStart = null;
for (TaskStatusPlus status : statuses) {
if (status.getStatusCode() == null) {
// Unknown status
} else if (status.getStatusCode().isComplete()) {
latestCompletedTaskStart = DateTimes.laterOf(
latestCompletedTaskStart,
status.getCreatedTime()
);
} else {
earliestActiveTaskStart = DateTimes.earlierOf(
earliestActiveTaskStart,
status.getCreatedTime()
);
}
}
return DateTimes.earlierOf(latestCompletedTaskStart, earliestActiveTaskStart)
.minus(DURATION_TO_RETAIN);
}
}

View File

@ -147,6 +147,8 @@ public class Stats
= CoordinatorStat.toDebugAndEmit("killMaxSlots", "killTask/maxSlot/count");
public static final CoordinatorStat SUBMITTED_TASKS
= CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count");
public static final CoordinatorStat PENDING_SEGMENTS
= CoordinatorStat.toDebugAndEmit("killPendingSegs", "kill/pendingSegments/count");
}
public static class Balancer

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class KillStalePendingSegmentsTest
{
private TestOverlordClient overlordClient;
private KillStalePendingSegments killDuty;
@Before
public void setup()
{
this.overlordClient = new TestOverlordClient();
this.killDuty = new KillStalePendingSegments(overlordClient);
}
@Test
public void testRetentionStarts1DayBeforeNowWhenNoKnownTask()
{
DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build();
killDuty.run(params);
final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI);
Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
// Verify that the cutoff time is no later than 1 day ago from now
DateTime expectedCutoffTime = DateTimes.nowUtc().minusDays(1);
Assert.assertTrue(
expectedCutoffTime.getMillis() - observedKillInterval.getEnd().getMillis() <= 100
);
}
@Test
public void testRetentionStarts1DayBeforeEarliestActiveTask()
{
final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01");
overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask, TaskState.RUNNING);
overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusHours(2), TaskState.RUNNING);
overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusDays(1), TaskState.RUNNING);
overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusHours(3), TaskState.RUNNING);
DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build();
killDuty.run(params);
final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI);
Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
Assert.assertEquals(startOfEarliestActiveTask.minusDays(1), observedKillInterval.getEnd());
}
@Test
public void testRetentionStarts1DayBeforeLatestCompletedTask()
{
final DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01");
overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.FAILED);
overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusHours(2), TaskState.SUCCESS);
overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(2), TaskState.FAILED);
overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build();
killDuty.run(params);
final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI);
Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
Assert.assertEquals(startOfLatestCompletedTask.minusDays(1), observedKillInterval.getEnd());
final CoordinatorRunStats stats = params.getCoordinatorStats();
Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.WIKI)));
}
@Test
public void testRetentionStarts1DayBeforeLatestCompletedOrEarliestActiveTask()
{
final DateTime startOfLatestCompletedTask = DateTimes.of("2023-02-01");
overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.FAILED);
final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01");
overlordClient.addTaskAndSegment(DS.KOALA, startOfEarliestActiveTask, TaskState.RUNNING);
DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI, DS.KOALA).build();
killDuty.run(params);
DateTime earliestEligibleTask = DateTimes.earlierOf(startOfEarliestActiveTask, startOfLatestCompletedTask);
final Interval wikiKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI);
Assert.assertEquals(DateTimes.MIN, wikiKillInterval.getStart());
Assert.assertEquals(earliestEligibleTask.minusDays(1), wikiKillInterval.getEnd());
final Interval koalaKillInterval = overlordClient.observedKillIntervals.get(DS.KOALA);
Assert.assertEquals(DateTimes.MIN, koalaKillInterval.getStart());
Assert.assertEquals(earliestEligibleTask.minusDays(1), wikiKillInterval.getEnd());
}
@Test
public void testPendingSegmentOfDisallowedDatasourceIsNotDeleted()
{
DruidCoordinatorRuntimeParams params =
createParamsWithDatasources(DS.WIKI, DS.KOALA).withDynamicConfigs(
CoordinatorDynamicConfig
.builder()
.withDatasourcesToNotKillPendingSegmentsIn(
Collections.singleton(DS.KOALA)
)
.build()
).build();
DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01");
overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.SUCCESS);
overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS);
overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask, TaskState.SUCCESS);
overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS);
killDuty.run(params);
// Verify that stale pending segments are killed in "wiki" but not in "koala"
final CoordinatorRunStats stats = params.getCoordinatorStats();
Assert.assertTrue(overlordClient.observedKillIntervals.containsKey(DS.WIKI));
Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.WIKI)));
Assert.assertFalse(overlordClient.observedKillIntervals.containsKey(DS.KOALA));
Assert.assertEquals(0, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.KOALA)));
}
private DruidCoordinatorRuntimeParams.Builder createParamsWithDatasources(String... datasources)
{
DruidCoordinatorRuntimeParams.Builder builder = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
// Create a dummy for each of the datasources so that they get added to the timeline
Set<DataSegment> usedSegments = new HashSet<>();
for (String datasource : datasources) {
usedSegments.add(
DataSegment.builder().dataSource(datasource).interval(Intervals.ETERNITY)
.version("v1").shardSpec(new NumberedShardSpec(0, 1)).size(100).build()
);
}
return builder.withUsedSegments(usedSegments);
}
private static class DS
{
static final String WIKI = "wiki";
static final String KOALA = "koala";
}
/**
* Simulates an Overlord with a configurable list of tasks and pending segments.
*/
private static class TestOverlordClient extends NoopOverlordClient
{
private final List<TaskStatusPlus> taskStatuses = new ArrayList<>();
private final Map<String, List<DateTime>> datasourceToPendingSegments = new HashMap<>();
private final Map<String, Interval> observedKillIntervals = new HashMap<>();
private int taskIdSuffix = 0;
void addTaskAndSegment(String datasource, DateTime createdTime, TaskState state)
{
taskStatuses.add(
new TaskStatusPlus(
datasource + "__" + taskIdSuffix++,
null, null, createdTime, createdTime, state,
state.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.RUNNING,
100L, TaskLocation.unknown(), datasource, null
)
);
// Add a pending segment with created time 5 minutes after the task was created
datasourceToPendingSegments.computeIfAbsent(datasource, ds -> new ArrayList<>())
.add(createdTime.plusMinutes(5));
}
@Override
public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
@Nullable String state,
@Nullable String dataSource,
@Nullable Integer maxCompletedTasks
)
{
return Futures.immediateFuture(
CloseableIterators.wrap(taskStatuses.iterator(), null)
);
}
@Override
public ListenableFuture<Integer> killPendingSegments(String dataSource, Interval interval)
{
observedKillIntervals.put(dataSource, interval);
List<DateTime> pendingSegments = datasourceToPendingSegments.remove(dataSource);
if (pendingSegments == null || pendingSegments.isEmpty()) {
return Futures.immediateFuture(0);
}
List<DateTime> remainingPendingSegments = new ArrayList<>();
int numDeletedPendingSegments = 0;
for (DateTime createdTime : pendingSegments) {
if (createdTime.isBefore(interval.getEnd())) {
++numDeletedPendingSegments;
} else {
remainingPendingSegments.add(createdTime);
}
}
if (remainingPendingSegments.size() > 0) {
datasourceToPendingSegments.put(dataSource, remainingPendingSegments);
}
return Futures.immediateFuture(numDeletedPendingSegments);
}
}
}

View File

@ -78,7 +78,6 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.server.audit.AuditManagerProvider;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.KillStalePendingSegments;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyConfig;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
@ -91,6 +90,7 @@ import org.apache.druid.server.coordinator.duty.KillAuditLog;
import org.apache.druid.server.coordinator.duty.KillCompactionConfig;
import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
import org.apache.druid.server.coordinator.duty.KillRules;
import org.apache.druid.server.coordinator.duty.KillStalePendingSegments;
import org.apache.druid.server.coordinator.duty.KillSupervisors;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;