diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md
index 6acd2fc782b..fccb14007b9 100644
--- a/docs/data-management/delete.md
+++ b/docs/data-management/delete.md
@@ -95,9 +95,10 @@ The available grammar is:
"id": ,
"dataSource": ,
"interval" : ,
- "context": ,
- "batchSize": ,
- "limit":
+ "context": ,
+ "batchSize": ,
+ "limit": ,
+ "maxUsedStatusLastUpdatedTime":
}
```
@@ -106,7 +107,8 @@ Some of the parameters used in the task payload are further explained below:
| Parameter | Default | Explanation |
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.|
-| `limit` | null - no limit | Maximum number of segments for the kill task to delete.|
+| `limit` | null (no limit) | Maximum number of segments for the kill task to delete.|
+| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used as a cutoff to include unused segments. The kill task only considers segments which lie in the specified `interval` and were marked as unused no later than this time. The default behavior is to kill all unused segments in the `interval` regardless of when they where marked as unused.|
**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
diff --git a/docs/operations/clean-metadata-store.md b/docs/operations/clean-metadata-store.md
index 202b27805ed..b30374123ee 100644
--- a/docs/operations/clean-metadata-store.md
+++ b/docs/operations/clean-metadata-store.md
@@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic configuration parameter
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `P1D`. Must be greater than `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion.
+- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override `druid.coordinator.kill.durationToRetain`. When enabled, the coordinator considers all unused segments as eligible to be killed.
+- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a segment must be unused before it can be permanently removed from metadata and deep storage. This serves as a buffer period to prevent data loss if data ends up being needed after being marked unused.
- `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments to delete per kill task.
### Audit records
@@ -189,15 +191,15 @@ druid.coordinator.kill.datasource.on=false
## Example configuration for automated metadata cleanup
-Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old. The exception is for audit logs, which you need to retain for 30 days:
+Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old after a seven day buffer period in case you want to recover the data. The exception is for audit logs, which you need to retain for 30 days:
```properties
...
# Schedule the metadata management store task for every hour:
-druid.coordinator.period.metadataStoreManagementPeriod=P1H
+druid.coordinator.period.metadataStoreManagementPeriod=PT1H
-# Set a kill task to poll every day to delete Segment records and segments
-# in deep storage > 4 days old. When druid.coordinator.kill.on is set to true,
+# Set a kill task to poll every day to delete segment records and segments
+# in deep storage > 4 days old after a 7-day buffer period. When druid.coordinator.kill.on is set to true,
# you can set killDataSourceWhitelist in the dynamic configuration to limit
# the datasources that can be killed.
# Required also for automated cleanup of rules and compaction configuration.
@@ -205,6 +207,7 @@ druid.coordinator.period.metadataStoreManagementPeriod=P1H
druid.coordinator.kill.on=true
druid.coordinator.kill.period=P1D
druid.coordinator.kill.durationToRetain=P4D
+druid.coordinator.kill.bufferPeriod=P7D
druid.coordinator.kill.maxSegments=1000
# Poll every day to delete audit records > 30 days old
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
index 150648858c1..bb188952966 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -42,16 +43,21 @@ public class RetrieveUnusedSegmentsAction implements TaskAction> getReturnTypeReference()
{
@@ -83,7 +96,7 @@ public class RetrieveUnusedSegmentsAction implements TaskAction perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
- .retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
+ .retrieveUnusedSegmentsForInterval(dataSource, interval, limit, maxUsedStatusLastUpdatedTime);
}
@Override
@@ -99,6 +112,7 @@ public class RetrieveUnusedSegmentsAction implements TaskAction
* The field {@link #isMarkAsUnused()} is now deprecated.
+ *
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
@@ -95,6 +96,12 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
*/
@Nullable private final Integer limit;
+ /**
+ * The maximum used status last updated time. Any segments with
+ * {@code used_status_last_updated} no later than this time will be included in the kill task.
+ */
+ @Nullable private final DateTime maxUsedStatusLastUpdatedTime;
+
@JsonCreator
public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
@@ -103,7 +110,8 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
@JsonProperty("context") Map context,
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
- @JsonProperty("limit") @Nullable Integer limit
+ @JsonProperty("limit") @Nullable Integer limit,
+ @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
super(
@@ -115,15 +123,16 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
if (this.batchSize <= 0) {
- throw InvalidInput.exception("batchSize[%d] must be a positive integer.", limit);
+ throw InvalidInput.exception("batchSize[%d] must be a positive integer.", batchSize);
}
if (limit != null && limit <= 0) {
- throw InvalidInput.exception("Limit[%d] must be a positive integer.", limit);
+ throw InvalidInput.exception("limit[%d] must be a positive integer.", limit);
}
if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
- throw InvalidInput.exception("Limit cannot be provided when markAsUnused is enabled.");
+ throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit);
}
this.limit = limit;
+ this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}
/**
@@ -155,6 +164,13 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
return limit;
}
+ @Nullable
+ @JsonProperty
+ public DateTime getMaxUsedStatusLastUpdatedTime()
+ {
+ return maxUsedStatusLastUpdatedTime;
+ }
+
@Override
public String getType()
{
@@ -180,7 +196,8 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
);
- LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused);
+ LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as unused.",
+ numSegmentsMarkedAsUnused, getDataSource(), getInterval());
} else {
numSegmentsMarkedAsUnused = 0;
}
@@ -190,9 +207,13 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
@Nullable Integer numTotalBatches = getNumTotalBatches();
List unusedSegments;
LOG.info(
- "Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s",
+ "Starting kill for datasource[%s] in interval[%s] with batchSize[%d], up to limit[%d] segments "
+ + "before maxUsedStatusLastUpdatedTime[%s] will be deleted%s",
+ getDataSource(),
+ getInterval(),
batchSize,
limit,
+ maxUsedStatusLastUpdatedTime,
numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
);
@@ -217,7 +238,9 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
+ .submit(
+ new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize, maxUsedStatusLastUpdatedTime
+ ));
// Fetch locks each time as a revokal could have occurred in between batches
final NavigableMap> taskLockMap
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
index d23b3820db7..bea3685ca24 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
@@ -87,7 +87,7 @@ public class MoveTask extends AbstractFixedIntervalTask
// List unused segments
final List unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));
+ .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
index 1364bcb597f..cc635383fed 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
@@ -80,7 +80,7 @@ public class RestoreTask extends AbstractFixedIntervalTask
// List unused segments
final List unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));
+ .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
index 24d2f0a9043..0fbfb1733a8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
@@ -20,9 +20,11 @@
package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -104,7 +106,23 @@ public class RetrieveSegmentsActionsTest
@Test
public void testRetrieveUnusedSegmentsAction()
{
- final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
+ final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null);
+ final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
+ Assert.assertEquals(expectedUnusedSegments, resultSegments);
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime()
+ {
+ final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.MIN);
+ final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
+ Assert.assertEquals(ImmutableSet.of(), resultSegments);
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsActionWithNowUsedLastUpdatedTime()
+ {
+ final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.nowUtc());
final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
index 3ab6bae4688..7e7c9088d61 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Before;
@@ -53,7 +54,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Intervals.of("2020-01-01/P1D"),
false,
99,
- 5
+ 5,
+ DateTimes.nowUtc()
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
@@ -63,7 +65,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
-
+ Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(), fromJson.getMaxUsedStatusLastUpdatedTime());
}
@Test
@@ -75,6 +77,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Intervals.of("2020-01-01/P1D"),
true,
null,
+ null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
@@ -85,6 +88,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
+ Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
}
@Test
@@ -97,6 +101,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
null,
true,
99,
+ null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
@@ -110,5 +115,33 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertNull(task.getLimit());
+ Assert.assertNull(task.getMaxUsedStatusLastUpdatedTime());
+ }
+
+ @Test
+ public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegmentsTaskQuery() throws IOException
+ {
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask(
+ null,
+ "datasource",
+ Intervals.of("2020-01-01/P1D"),
+ null,
+ null,
+ 99,
+ 100,
+ DateTimes.nowUtc()
+ );
+ final byte[] json = objectMapper.writeValueAsBytes(task);
+ final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
+ json,
+ ClientTaskQuery.class
+ );
+ Assert.assertEquals(task.getId(), taskQuery.getId());
+ Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
+ Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
+ Assert.assertNull(taskQuery.getMarkAsUnused());
+ Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
+ Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
+ Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(), taskQuery.getMaxUsedStatusLastUpdatedTime());
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index d77f2e6c243..e2c433536a2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
@@ -29,8 +31,11 @@ import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
+import org.hamcrest.MatcherAssert;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@@ -40,6 +45,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class KillUnusedSegmentsTaskTest extends IngestionTestBase
{
@@ -86,18 +92,24 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
false,
null,
+ null,
null
);
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
- final List unusedSegments =
- getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));
+ final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ null,
+ null
+ );
Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments);
- Assertions.assertThat(
- getMetadataStorageCoordinator()
- .retrieveUsedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"), Segments.ONLY_VISIBLE)
+ Assertions.assertThat(getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ Segments.ONLY_VISIBLE)
).containsExactlyInAnyOrder(
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
@@ -135,13 +147,19 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
true,
null,
+ null,
null
);
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
final List unusedSegments =
- getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ null,
+ null
+ );
Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments);
Assertions.assertThat(
@@ -166,6 +184,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
true,
null,
+ null,
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
@@ -176,10 +195,10 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
{
final String version = DateTimes.nowUtc().toString();
final Set segments = ImmutableSet.of(
- newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
- newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
- newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
- newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
+ newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
+ newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
+ newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
+ newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set announced = getMetadataStorageCoordinator().commitSegments(segments);
@@ -194,58 +213,374 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
);
final KillUnusedSegmentsTask task =
- new KillUnusedSegmentsTask(
- null,
- DATA_SOURCE,
- Intervals.of("2018-01-01/2020-01-01"),
- null,
- false,
- 1,
- 4
- );
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 1,
+ 4,
+ null
+ );
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
// we expect ALL tasks to be deleted
final List unusedSegments =
- getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ null,
+ null
+ );
Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(new KillTaskReport.Stats(4, 4, 0), getReportedStats());
}
+ /**
+ * Test kill functionality of multiple unused segments in a wide interval with different {@code used_status_last_updated}
+ * timestamps. A kill task submitted with null {@code maxUsedStatusLastUpdatedTime} will kill all the unused segments in the kill
+ * interval.
+ */
+ @Test
+ public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() throws Exception
+ {
+ final String version = DateTimes.nowUtc().toString();
+ final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+ final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+ final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+ final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+ final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4);
+ final Set announced = getMetadataStorageCoordinator().commitSegments(segments);
+
+ Assert.assertEquals(segments, announced);
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment1.getInterval()
+ )
+ );
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment4.getInterval()
+ )
+ );
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment3.getInterval()
+ )
+ );
+
+ final List segmentIntervals = segments.stream()
+ .map(DataSegment::getInterval)
+ .collect(Collectors.toList());
+
+ final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals);
+
+
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ null
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
+
+ final List unusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(), unusedSegments);
+ Assert.assertEquals(new KillTaskReport.Stats(3, 4, 0), getReportedStats());
+ }
+
+ /**
+ * Test kill functionality of multiple unused segments in a wide interval with different {@code used_status_last_updated}
+ * timestamps. Consider:
+ * {@code segment1}, {@code segment2} and {@code segment3} have t1, t2 and t3 {@code used_status_last_updated} timestamps
+ * respectively, where t1 < t2 < t3
+ * {@code segment4} is a used segment and therefore shouldn't be killed
+ *
+ *
+ * A kill task submitted with t2 as the {@code maxUsedStatusLastUpdatedTime} should only kill {@code segment1} and {@code segment2}
+ * After that, a kill task submitted with t3 as the {@code maxUsedStatusLastUpdatedTime} should kill {@code segment3}.
+ *
+ */
+ @Test
+ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime() throws Exception
+ {
+ final String version = DateTimes.nowUtc().toString();
+ final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+ final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+ final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+ final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+ final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4);
+ final Set announced = getMetadataStorageCoordinator().commitSegments(segments);
+
+ Assert.assertEquals(segments, announced);
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment1.getInterval()
+ )
+ );
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment4.getInterval()
+ )
+ );
+
+ // Capture the last updated time cutoff
+ final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc();
+
+ // Delay for 1s, mark the segments as unused and then capture the last updated time cutoff again
+ Thread.sleep(1000);
+
+ // now mark the third segment as unused
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment3.getInterval()
+ )
+ );
+
+ final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc();
+
+
+ final List segmentIntervals = segments.stream()
+ .map(DataSegment::getInterval)
+ .collect(Collectors.toList());
+
+ final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals);
+
+ final KillUnusedSegmentsTask task1 =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ maxUsedStatusLastUpdatedTime1
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode());
+
+ final List unusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(segment3), unusedSegments);
+ Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+
+ final KillUnusedSegmentsTask task2 =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ maxUsedStatusLastUpdatedTime2
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode());
+
+ final List unusedSegments2 =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(), unusedSegments2);
+ Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats());
+ }
+
+ /**
+ * Similar to {@link #testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime()}}, but with a different setup.
+ *
+ * Tests kill functionality of multiple unused segments in a wide interval with different {@code used_status_last_updated}
+ * timestamps. Consider:
+ *
{@code segment1} and {@code segment4} have t1 {@code used_status_last_updated} timestamp
+ * {@code segment2} and {@code segment3} have t2 {@code used_status_last_updated} timestamp, where t1 < t2
+ *
+ *
+ * A kill task submitted with t1 as the {@code maxUsedStatusLastUpdatedTime} should only kill {@code segment1} and {@code segment4}
+ * After that, a kill task submitted with t2 as the {@code maxUsedStatusLastUpdatedTime} should kill {@code segment2} and {@code segment3}.
+ *
+ */
+ @Test
+ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime2() throws Exception
+ {
+ final String version = DateTimes.nowUtc().toString();
+ final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+ final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+ final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+ final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+ final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4);
+ final Set announced = getMetadataStorageCoordinator().commitSegments(segments);
+
+ Assert.assertEquals(segments, announced);
+
+ Assert.assertEquals(
+ 2,
+ getSegmentsMetadataManager().markSegmentsAsUnused(
+ ImmutableSet.of(
+ segment1.getId(),
+ segment4.getId()
+ )
+ )
+ );
+
+ final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc();
+
+ // Delay for 1s, mark the segments as unused and then capture the last updated time cutoff again
+ Thread.sleep(1000);
+
+ Assert.assertEquals(
+ 2,
+ getSegmentsMetadataManager().markSegmentsAsUnused(
+ ImmutableSet.of(
+ segment2.getId(),
+ segment3.getId()
+ )
+ )
+ );
+
+ final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc();
+
+
+ final List segmentIntervals = segments.stream()
+ .map(DataSegment::getInterval)
+ .collect(Collectors.toList());
+
+ final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals);
+
+
+ final KillUnusedSegmentsTask task1 =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ maxUsedStatusLastUpdatedTime1
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode());
+
+ final List unusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(segment2, segment3), unusedSegments);
+ Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+
+ final KillUnusedSegmentsTask task2 =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ maxUsedStatusLastUpdatedTime2
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode());
+
+ final List unusedSegments2 = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(), unusedSegments2);
+ Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+ }
+
@Test
public void testKillBatchSizeThree() throws Exception
{
final String version = DateTimes.nowUtc().toString();
final Set segments = ImmutableSet.of(
- newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
- newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
- newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
- newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
+ newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
+ newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
+ newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
+ newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set announced = getMetadataStorageCoordinator().commitSegments(segments);
Assert.assertEquals(segments, announced);
final KillUnusedSegmentsTask task =
- new KillUnusedSegmentsTask(
- null,
- DATA_SOURCE,
- Intervals.of("2018-01-01/2020-01-01"),
- null,
- true,
- 3,
- null
- );
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ true,
+ 3,
+ null,
+ null
+ );
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
// we expect ALL tasks to be deleted
- final List unusedSegments =
- getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));
+ final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ null,
+ null
+ );
Assert.assertEquals(Collections.emptyList(), unusedSegments);
@@ -263,6 +598,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
false,
null,
+ null,
null
);
Assert.assertEquals(100, task.computeNextBatchSize(50));
@@ -279,7 +615,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
false,
10,
- 5
+ 5,
+ null
);
Assert.assertEquals(5, task.computeNextBatchSize(0));
}
@@ -295,7 +632,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
false,
5,
- 10
+ 10,
+ null
);
Assert.assertEquals(5, task.computeNextBatchSize(0));
}
@@ -311,7 +649,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
false,
5,
- 10
+ 10,
+ null
);
Assert.assertEquals(3, task.computeNextBatchSize(7));
}
@@ -327,6 +666,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
false,
null,
+ null,
null
);
Assert.assertNull(task.getNumTotalBatches());
@@ -343,11 +683,81 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
false,
10,
- 5
+ 5,
+ null
);
Assert.assertEquals(1, (int) task.getNumTotalBatches());
}
+ @Test
+ public void testInvalidLimit()
+ {
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 10,
+ 0,
+ null
+ )
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "limit[0] must be a positive integer."
+ )
+ );
+ }
+
+ @Test
+ public void testInvalidBatchSize()
+ {
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 0,
+ 10,
+ null
+ )
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "batchSize[0] must be a positive integer."
+ )
+ );
+ }
+
+ @Test
+ public void testInvalidMarkAsUnusedWithLimit()
+ {
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ true,
+ 10,
+ 10,
+ null
+ )
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "limit[10] cannot be provided when markAsUnused is enabled."
+ )
+ );
+ }
+
@Test
public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit()
{
@@ -359,7 +769,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
null,
false,
5,
- 10
+ 10,
+ null
);
Assert.assertEquals(2, (int) task.getNumTotalBatches());
}
@@ -387,7 +798,9 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
try {
Object payload = getObjectMapper().readValue(
taskRunner.getTaskReportsFile(),
- new TypeReference