diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index 60377d858ca..260a66a1749 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -95,9 +95,17 @@ The available grammar is: "id": , "dataSource": , "interval" : , - "context": + "context": , + "batchSize": } ``` +Some of the parameters used in the task payload are further explained below: + +| Parameter |Default| Explanation | +|--------------|-------|--------------------------------------------------------------------------------------------------------| +| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.| + **WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and deep storage. This operation cannot be undone. + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 0dbceaa7e78..0d54ae96b05 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -23,7 +23,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; @@ -60,15 +63,35 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask { private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class); + /** + * Default nuke batch size. This is a small enough size that we still get value from batching, while + * yielding as quickly as possible. In one real cluster environment backed with mysql, ~2000rows/sec, + * with batch size of 100, means a batch should only less than a second for the task lock, and depending + * on the segment store latency, unoptimised S3 cleanups typically take 5-10 seconds per 100. Over time + * we expect the S3 cleanup to get quicker, so this should be < 1 second, which means we'll be yielding + * the task lockbox every 1-2 seconds. + */ + private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; + private final boolean markAsUnused; + /** + * Split processing to try and keep each nuke operation relatively short, in the case that either + * the database or the storage layer is particularly slow. + */ + private final int batchSize; + + // counter included primarily for testing + private long numBatchesProcessed = 0; + @JsonCreator public KillUnusedSegmentsTask( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("context") Map context, - @JsonProperty("markAsUnused") Boolean markAsUnused + @JsonProperty("markAsUnused") Boolean markAsUnused, + @JsonProperty("batchSize") Integer batchSize ) { super( @@ -78,6 +101,8 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask context ); this.markAsUnused = markAsUnused != null && markAsUnused; + this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE; + Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero"); } @JsonProperty @@ -87,6 +112,13 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask return markAsUnused; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public int getBatchSize() + { + return batchSize; + } + @Override public String getType() { @@ -101,6 +133,13 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask return ImmutableSet.of(); } + @JsonIgnore + @VisibleForTesting + long getNumBatchesProcessed() + { + return numBatchesProcessed; + } + @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { @@ -114,22 +153,48 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask } // List unused segments - final List unusedSegments = toolbox + final List allUnusedSegments = toolbox .getTaskActionClient() .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval())); - if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { - throw new ISE( - "Locks[%s] for task[%s] can't cover segments[%s]", - taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), - getId(), - unusedSegments - ); + final List> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize); + + // The individual activities here on the toolbox have possibility to run for a longer period of time, + // since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the + // task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the + // lock to other activity that might need to happen using the overlord tasklockbox. + + LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).", + getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize); + + for (final List unusedSegments : unusedSegmentBatches) { + if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { + throw new ISE( + "Locks[%s] for task[%s] can't cover segments[%s]", + taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), + getId(), + unusedSegments + ); + } + + // Kill segments + // Order is important here: we want the nuke action to clean up the metadata records _before_ the + // segments are removed from storage, this helps maintain that we will always have a storage segment if + // the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is + // abandoned. + + toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); + toolbox.getDataSegmentKiller().kill(unusedSegments); + numBatchesProcessed++; + + if (numBatchesProcessed % 10 == 0) { + LOG.info("Processed [%d/%d] batches for kill task[%s].", + numBatchesProcessed, unusedSegmentBatches.size(), getId()); + } } - // Kill segments - toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); - toolbox.getDataSegmentKiller().kill(unusedSegments); + LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.", + getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size()); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java index e4583c91abf..70cd5fcf19b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java @@ -51,7 +51,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest "killTaskId", "datasource", Intervals.of("2020-01-01/P1D"), - true + true, + 99 ); final byte[] json = objectMapper.writeValueAsBytes(taskQuery); final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class); @@ -59,6 +60,26 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource()); Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); + Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize())); + } + + @Test + public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefaultBatchSize() throws IOException + { + final ClientKillUnusedSegmentsTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery( + "killTaskId", + "datasource", + Intervals.of("2020-01-01/P1D"), + true, + null + ); + final byte[] json = objectMapper.writeValueAsBytes(taskQuery); + final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class); + Assert.assertEquals(taskQuery.getId(), fromJson.getId()); + Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); + Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); + Assert.assertEquals(100, fromJson.getBatchSize()); } @Test @@ -69,7 +90,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest "datasource", Intervals.of("2020-01-01/P1D"), null, - true + true, + 99 ); final byte[] json = objectMapper.writeValueAsBytes(task); final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue( @@ -80,5 +102,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource()); Assert.assertEquals(task.getInterval(), taskQuery.getInterval()); Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused()); + Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 86c6aeb4095..f57624ed7dd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -79,7 +80,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null, - false + false, + null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); @@ -95,6 +97,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); + Assert.assertEquals(1L, task.getNumBatchesProcessed()); } @@ -124,7 +127,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null, - true + true, + null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); @@ -140,6 +144,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); + Assert.assertEquals(1L, task.getNumBatchesProcessed()); } @Test @@ -151,11 +156,82 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null, - true + true, + null ); Assert.assertTrue(task.getInputSourceResources().isEmpty()); } + @Test + public void testKillBatchSizeOne() throws Exception + { + final String version = DateTimes.nowUtc().toString(); + final Set segments = ImmutableSet.of( + newSegment(Intervals.of("2019-01-01/2019-02-01"), version), + newSegment(Intervals.of("2019-02-01/2019-03-01"), version), + newSegment(Intervals.of("2019-03-01/2019-04-01"), version), + newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + ); + final Set announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments); + + Assert.assertEquals(segments, announced); + + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + true, + 1 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + // we expect ALL tasks to be deleted + + final List unusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); + + Assert.assertEquals(Collections.emptyList(), unusedSegments); + Assert.assertEquals(4L, task.getNumBatchesProcessed()); + } + + @Test + public void testKillBatchSizeThree() throws Exception + { + final String version = DateTimes.nowUtc().toString(); + final Set segments = ImmutableSet.of( + newSegment(Intervals.of("2019-01-01/2019-02-01"), version), + newSegment(Intervals.of("2019-02-01/2019-03-01"), version), + newSegment(Intervals.of("2019-03-01/2019-04-01"), version), + newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + ); + final Set announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments); + + Assert.assertEquals(segments, announced); + + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + true, + 3 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + // we expect ALL tasks to be deleted + + final List unusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); + + Assert.assertEquals(Collections.emptyList(), unusedSegments); + Assert.assertEquals(2L, task.getNumBatchesProcessed()); + } + private static DataSegment newSegment(Interval interval, String version) { return new DataSegment( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 9b9d6d36224..2ea7fcedf19 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -948,7 +948,8 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest "test_kill_task", Intervals.of("2011-04-01/P4D"), null, - false + false, + null ); final TaskStatus status = runTask(killUnusedSegmentsTask); @@ -956,6 +957,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size()); + Assert.assertEquals("delete segment batch call count", 1, mdc.getDeleteSegmentsCount()); Assert.assertTrue( "expected unused segments get killed", expectedUnusedSegments.containsAll(mdc.getNuked()) && mdc.getNuked().containsAll( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index d64bd1d2226..c8bf8fd28ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -49,6 +49,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto private final Set published = Sets.newConcurrentHashSet(); private final Set nuked = Sets.newConcurrentHashSet(); private final List unusedSegments; + private int deleteSegmentsCount = 0; public TestIndexerMetadataStorageCoordinator() { @@ -201,6 +202,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto @Override public void deleteSegments(Set segments) { + deleteSegmentsCount++; nuked.addAll(segments); } @@ -220,6 +222,11 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto return ImmutableSet.copyOf(nuked); } + public int getDeleteSegmentsCount() + { + return deleteSegmentsCount; + } + public void setUnusedSegments(List newUnusedSegments) { synchronized (unusedSegments) { diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java index 4435e5fac4c..779c8acc7f2 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java @@ -39,19 +39,22 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery private final String dataSource; private final Interval interval; private final Boolean markAsUnused; + private final Integer batchSize; @JsonCreator public ClientKillUnusedSegmentsTaskQuery( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, - @JsonProperty("markAsUnused") Boolean markAsUnused + @JsonProperty("markAsUnused") Boolean markAsUnused, + @JsonProperty("batchSize") Integer batchSize ) { this.id = Preconditions.checkNotNull(id, "id"); this.dataSource = dataSource; this.interval = interval; this.markAsUnused = markAsUnused; + this.batchSize = batchSize; } @JsonProperty @@ -87,6 +90,12 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery return markAsUnused; } + @JsonProperty + public Integer getBatchSize() + { + return batchSize; + } + @Override public boolean equals(Object o) { @@ -100,12 +109,13 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery return Objects.equals(id, that.id) && Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval) - && Objects.equals(markAsUnused, that.markAsUnused); + && Objects.equals(markAsUnused, that.markAsUnused) + && Objects.equals(batchSize, that.batchSize); } @Override public int hashCode() { - return Objects.hash(id, dataSource, interval, markAsUnused); + return Objects.hash(id, dataSource, interval, markAsUnused, batchSize); } } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 0cb77ee7045..51b4323a11f 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -86,7 +86,7 @@ public interface OverlordClient default ListenableFuture runKillTask(String idPrefix, String dataSource, Interval interval) { final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval); - final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false); + final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false, null); return FutureUtils.transform(runTask(taskId, taskQuery), ignored -> taskId); } diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java index 0e6c0c86cb5..af9b2c8ec11 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java @@ -34,13 +34,14 @@ public class ClientKillUnusedSegmentsTaskQueryTest public static final DateTime START = DateTimes.nowUtc(); private static final Interval INTERVAL = new Interval(START, START.plus(1)); private static final Boolean MARK_UNUSED = true; + private static final Integer BATCH_SIZE = 999; ClientKillUnusedSegmentsTaskQuery clientKillUnusedSegmentsQuery; @Before public void setUp() { - clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true); + clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true, BATCH_SIZE); } @After @@ -73,12 +74,18 @@ public class ClientKillUnusedSegmentsTaskQueryTest Assert.assertEquals(MARK_UNUSED, clientKillUnusedSegmentsQuery.getMarkAsUnused()); } + @Test + public void testGetBatchSize() + { + Assert.assertEquals(BATCH_SIZE, clientKillUnusedSegmentsQuery.getBatchSize()); + } + @Test public void testEquals() { EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class) .usingGetClass() - .withNonnullFields("id", "dataSource", "interval") + .withNonnullFields("id", "dataSource", "interval", "batchSize") .verify(); } } diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 7e747956a1a..c1e48c496ca 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -422,7 +422,7 @@ public class OverlordClientImplTest public void test_taskPayload() throws ExecutionException, InterruptedException, JsonProcessingException { final String taskID = "taskId_1"; - final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null); + final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null, null); serviceClient.expectAndRespond( new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID),