Add a TaskReport for "kill" tasks (#15023)

- Add `KillTaskReport` that contains stats for `numSegmentsKilled`,
`numBatchesProcessed`, `numSegmentsMarkedAsUnused`
- Fix bug where exception message had no formatter but was was still being passed some args.
- Add some comments regarding deprecation of `markAsUnused` flag.
This commit is contained in:
Kashif Faraz 2023-09-23 07:44:27 +05:30 committed by GitHub
parent be3f93e3cf
commit d7c152c82c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 237 additions and 57 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.common; package org.apache.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects; import java.util.Objects;
@ -28,11 +29,12 @@ public class IngestionStatsAndErrorsTaskReport implements TaskReport
public static final String REPORT_KEY = "ingestionStatsAndErrors"; public static final String REPORT_KEY = "ingestionStatsAndErrors";
@JsonProperty @JsonProperty
private String taskId; private final String taskId;
@JsonProperty @JsonProperty
private IngestionStatsAndErrorsTaskReportData payload; private final IngestionStatsAndErrorsTaskReportData payload;
@JsonCreator
public IngestionStatsAndErrorsTaskReport( public IngestionStatsAndErrorsTaskReport(
@JsonProperty("taskId") String taskId, @JsonProperty("taskId") String taskId,
@JsonProperty("payload") IngestionStatsAndErrorsTaskReportData payload @JsonProperty("payload") IngestionStatsAndErrorsTaskReportData payload

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.util.Objects;
public class KillTaskReport implements TaskReport
{
public static final String REPORT_KEY = "killUnusedSegments";
private final String taskId;
private final Stats stats;
@JsonCreator
public KillTaskReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("payload") Stats stats
)
{
this.taskId = taskId;
this.stats = stats;
}
@Override
@JsonProperty
public String getTaskId()
{
return taskId;
}
@Override
public String getReportKey()
{
return REPORT_KEY;
}
@Override
@JsonProperty
public Object getPayload()
{
return stats;
}
public static class Stats
{
private final int numSegmentsKilled;
private final int numBatchesProcessed;
private final Integer numSegmentsMarkedAsUnused;
@JsonCreator
public Stats(
@JsonProperty("numSegmentsKilled") int numSegmentsKilled,
@JsonProperty("numBatchesProcessed") int numBatchesProcessed,
@JsonProperty("numSegmentsMarkedAsUnused") @Nullable Integer numSegmentsMarkedAsUnused
)
{
this.numSegmentsKilled = numSegmentsKilled;
this.numBatchesProcessed = numBatchesProcessed;
this.numSegmentsMarkedAsUnused = numSegmentsMarkedAsUnused;
}
@JsonProperty
public int getNumSegmentsKilled()
{
return numSegmentsKilled;
}
@JsonProperty
public int getNumBatchesProcessed()
{
return numBatchesProcessed;
}
@Nullable
@JsonProperty
public Integer getNumSegmentsMarkedAsUnused()
{
return numSegmentsMarkedAsUnused;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Stats that = (Stats) o;
return numSegmentsKilled == that.numSegmentsKilled
&& numBatchesProcessed == that.numBatchesProcessed
&& Objects.equals(this.numSegmentsMarkedAsUnused, that.numSegmentsMarkedAsUnused);
}
@Override
public int hashCode()
{
return Objects.hash(numSegmentsKilled, numBatchesProcessed, numSegmentsMarkedAsUnused);
}
@Override
public String toString()
{
return "Stats{" +
"numSegmentsKilled=" + numSegmentsKilled +
", numBatchesProcessed=" + numBatchesProcessed +
", numSegmentsMarkedAsUnused=" + numSegmentsMarkedAsUnused +
'}';
}
}
}

View File

@ -31,7 +31,8 @@ import java.util.Map;
*/ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "ingestionStatsAndErrors", value = IngestionStatsAndErrorsTaskReport.class) @JsonSubTypes.Type(name = "ingestionStatsAndErrors", value = IngestionStatsAndErrorsTaskReport.class),
@JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value = KillTaskReport.class)
}) })
public interface TaskReport public interface TaskReport
{ {

View File

@ -24,12 +24,13 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.error.InvalidInput; import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction; import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
@ -46,7 +47,6 @@ import org.joda.time.Interval;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -61,6 +61,8 @@ import java.util.stream.Collectors;
* The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}. * The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link * JSON serialization fields of this class must correspond to those of {@link
* ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields. * ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields.
* <p>
* The field {@link #isMarkAsUnused()} is now deprecated.
*/ */
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{ {
@ -77,19 +79,19 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
*/ */
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;
@Deprecated
private final boolean markAsUnused; private final boolean markAsUnused;
/** /**
* Split processing to try and keep each nuke operation relatively short, in the case that either * Split processing to try and keep each nuke operation relatively short, in the case that either
* the database or the storage layer is particularly slow. * the database or the storage layer is particularly slow.
*/ */
private final int batchSize; private final int batchSize;
/**
* Maximum number of segments that can be killed.
*/
@Nullable private final Integer limit; @Nullable private final Integer limit;
// counters included primarily for testing
private int numSegmentsKilled = 0;
private long numBatchesProcessed = 0;
@JsonCreator @JsonCreator
public KillUnusedSegmentsTask( public KillUnusedSegmentsTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@ -109,22 +111,26 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
); );
this.markAsUnused = markAsUnused != null && markAsUnused; this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE; this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero"); if (this.batchSize <= 0) {
if (null != limit && limit <= 0) { throw InvalidInput.exception("batchSize[%d] must be a positive integer.", limit);
throw InvalidInput.exception(
"limit [%d] is invalid. It must be a positive integer.",
limit
);
} }
if (limit != null && markAsUnused != null && markAsUnused) { if (limit != null && limit <= 0) {
throw InvalidInput.exception( throw InvalidInput.exception("Limit[%d] must be a positive integer.", limit);
"limit cannot be provided with markAsUnused.", }
limit if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
); throw InvalidInput.exception("Limit cannot be provided when markAsUnused is enabled.");
} }
this.limit = limit; this.limit = limit;
} }
/**
* This field has been deprecated as "kill" tasks should not be responsible for
* marking segments as unused. Instead, users should call the Coordinator API
* {@code /{dataSourceName}/markUnused} to explicitly mark segments as unused.
* Segments may also be marked unused by the Coordinator if they become overshadowed
* or have a {@code DropRule} applied to them.
*/
@Deprecated
@JsonProperty @JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT) @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isMarkAsUnused() public boolean isMarkAsUnused()
@ -160,30 +166,22 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
return ImmutableSet.of(); return ImmutableSet.of();
} }
@JsonIgnore
@VisibleForTesting
long getNumBatchesProcessed()
{
return numBatchesProcessed;
}
@JsonIgnore
@VisibleForTesting
long getNumSegmentsKilled()
{
return numSegmentsKilled;
}
@Override @Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{ {
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient()); final NavigableMap<DateTime, List<TaskLock>> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient());
// Track stats for reporting
int numSegmentsKilled = 0;
int numBatchesProcessed = 0;
final int numSegmentsMarkedAsUnused;
if (markAsUnused) { if (markAsUnused) {
int numMarked = toolbox.getTaskActionClient().submit( numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval()) new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
); );
LOG.info("Marked %d segments as unused.", numMarked); LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused);
} else {
numSegmentsMarkedAsUnused = 0;
} }
// List unused segments // List unused segments
@ -194,7 +192,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
"Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s", "Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s",
batchSize, batchSize,
limit, limit,
numTotalBatches != null ? StringUtils.format(" in([%d] batches]).", numTotalBatches) : "." numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
); );
do { do {
if (nextBatchSize <= 0) { if (nextBatchSize <= 0) {
@ -229,16 +227,21 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
nextBatchSize = computeNextBatchSize(numSegmentsKilled); nextBatchSize = computeNextBatchSize(numSegmentsKilled);
} while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches)); } while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches));
LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%d] unused segments " final String taskId = getId();
+ "in [%d] batches.", LOG.info(
getId(), "Finished kill task[%s] for dataSource[%s] and interval[%s]."
getDataSource(), + " Deleted total [%d] unused segments in [%d] batches.",
getInterval(), taskId, getDataSource(), getInterval(), numSegmentsKilled, numBatchesProcessed
numSegmentsKilled,
numBatchesProcessed
); );
return TaskStatus.success(getId()); final KillTaskReport.Stats stats =
new KillTaskReport.Stats(numSegmentsKilled, numBatchesProcessed, numSegmentsMarkedAsUnused);
toolbox.getTaskReportFileWriter().write(
taskId,
TaskReport.buildTaskReports(new KillTaskReport(taskId, stats))
);
return TaskStatus.success(taskId);
} }
@JsonIgnore @JsonIgnore

View File

@ -19,12 +19,15 @@
package org.apache.druid.indexing.common.task; package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
@ -35,13 +38,14 @@ import org.junit.Test;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
public class KillUnusedSegmentsTaskTest extends IngestionTestBase public class KillUnusedSegmentsTaskTest extends IngestionTestBase
{ {
private static final String DATA_SOURCE = "dataSource"; private static final String DATA_SOURCE = "dataSource";
private TaskRunner taskRunner; private TestTaskRunner taskRunner;
@Before @Before
public void setup() public void setup()
@ -99,8 +103,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
newSegment(Intervals.of("2019-04-01/2019-05-01"), version) newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
); );
Assert.assertEquals(2L, task.getNumBatchesProcessed()); Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats());
Assert.assertEquals(1, task.getNumSegmentsKilled());
} }
@ -148,8 +151,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version) newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
); );
Assert.assertEquals(2L, task.getNumBatchesProcessed());
Assert.assertEquals(1, task.getNumSegmentsKilled()); Assert.assertEquals(new KillTaskReport.Stats(1, 2, 1), getReportedStats());
} }
@Test @Test
@ -209,8 +212,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));
Assert.assertEquals(Collections.emptyList(), unusedSegments); Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(4L, task.getNumBatchesProcessed()); Assert.assertEquals(new KillTaskReport.Stats(4, 4, 0), getReportedStats());
Assert.assertEquals(4, task.getNumSegmentsKilled());
} }
@Test @Test
@ -246,8 +248,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));
Assert.assertEquals(Collections.emptyList(), unusedSegments); Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(3L, task.getNumBatchesProcessed());
Assert.assertEquals(4, task.getNumSegmentsKilled()); Assert.assertEquals(new KillTaskReport.Stats(4, 3, 4), getReportedStats());
} }
@Test @Test
@ -362,6 +364,38 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(2, (int) task.getNumTotalBatches()); Assert.assertEquals(2, (int) task.getNumTotalBatches());
} }
@Test
public void testKillTaskReportSerde() throws Exception
{
final String taskId = "test_serde_task";
final KillTaskReport.Stats stats = new KillTaskReport.Stats(1, 2, 3);
KillTaskReport report = new KillTaskReport(taskId, stats);
String json = getObjectMapper().writeValueAsString(report);
TaskReport deserializedReport = getObjectMapper().readValue(json, TaskReport.class);
Assert.assertTrue(deserializedReport instanceof KillTaskReport);
KillTaskReport deserializedKillReport = (KillTaskReport) deserializedReport;
Assert.assertEquals(KillTaskReport.REPORT_KEY, deserializedKillReport.getReportKey());
Assert.assertEquals(taskId, deserializedKillReport.getTaskId());
Assert.assertEquals(stats, deserializedKillReport.getPayload());
}
private KillTaskReport.Stats getReportedStats()
{
try {
Object payload = getObjectMapper().readValue(
taskRunner.getTaskReportsFile(),
new TypeReference<Map<String, TaskReport>>() { }
).get(KillTaskReport.REPORT_KEY).getPayload();
return getObjectMapper().convertValue(payload, KillTaskReport.Stats.class);
}
catch (Exception e) {
throw new ISE(e, "Error while reading task report");
}
}
private static DataSegment newSegment(Interval interval, String version) private static DataSegment newSegment(Interval interval, String version)
{ {
return new DataSegment( return new DataSegment(

View File

@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Objects; import java.util.Objects;
/** /**
@ -90,6 +89,14 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
return interval; return interval;
} }
/**
* This field has been deprecated as "kill" tasks should not be responsible for
* marking segments as unused. Instead, users should call the Coordinator API
* {@code /{dataSourceName}/markUnused} to explicitly mark segments as unused.
* Segments may also be marked unused by the Coordinator if they become overshadowed
* or have a {@code DropRule} applied to them.
*/
@Deprecated
@JsonProperty @JsonProperty
public Boolean getMarkAsUnused() public Boolean getMarkAsUnused()
{ {