diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
index 93436c84ead..dd04814d879 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/ControllerChatHandlerTest.java
@@ -41,7 +41,7 @@ public class ControllerChatHandlerTest
final Controller controller = Mockito.mock(Controller.class);
TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
- reportMap.put("killUnusedSegments", new KillTaskReport("kill_1", new KillTaskReport.Stats(1, 2, 3)));
+ reportMap.put("killUnusedSegments", new KillTaskReport("kill_1", new KillTaskReport.Stats(1, 2)));
Mockito.when(controller.liveReports())
.thenReturn(reportMap);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 78a7abae7a6..6a1f9e95a06 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -32,13 +32,14 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
+import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -46,7 +47,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -66,9 +66,6 @@ import java.util.stream.Collectors;
* The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link
* ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields.
- *
- * The field {@link #isMarkAsUnused()} is now deprecated.
- *
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
@@ -91,8 +88,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
@Nullable
private final List versions;
- @Deprecated
- private final boolean markAsUnused;
/**
* Split processing to try and keep each nuke operation relatively short, in the case that either
* the database or the storage layer is particularly slow.
@@ -117,7 +112,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
@JsonProperty("interval") Interval interval,
@JsonProperty("versions") @Nullable List versions,
@JsonProperty("context") Map context,
- @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
@JsonProperty("limit") @Nullable Integer limit,
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
@@ -129,7 +123,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
interval,
context
);
- this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
if (this.batchSize <= 0) {
throw InvalidInput.exception("batchSize[%d] must be a positive integer.", batchSize);
@@ -137,14 +130,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
if (limit != null && limit <= 0) {
throw InvalidInput.exception("limit[%d] must be a positive integer.", limit);
}
- if (Boolean.TRUE.equals(markAsUnused)) {
- if (limit != null) {
- throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit);
- }
- if (!CollectionUtils.isNullOrEmpty(versions)) {
- throw InvalidInput.exception("versions[%s] cannot be provided when markAsUnused is enabled.", versions);
- }
- }
this.versions = versions;
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
@@ -158,21 +143,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
return versions;
}
- /**
- * This field has been deprecated as "kill" tasks should not be responsible for
- * marking segments as unused. Instead, users should call the Coordinator API
- * {@code /{dataSourceName}/markUnused} to explicitly mark segments as unused.
- * Segments may also be marked unused by the Coordinator if they become overshadowed
- * or have a {@code DropRule} applied to them.
- */
- @Deprecated
- @JsonProperty
- @JsonInclude(JsonInclude.Include.NON_DEFAULT)
- public boolean isMarkAsUnused()
- {
- return markAsUnused;
- }
-
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getBatchSize()
@@ -214,16 +184,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
// Track stats for reporting
int numSegmentsKilled = 0;
int numBatchesProcessed = 0;
- final int numSegmentsMarkedAsUnused;
- if (markAsUnused) {
- numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
- new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
- );
- LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as unused.",
- numSegmentsMarkedAsUnused, getDataSource(), getInterval());
- } else {
- numSegmentsMarkedAsUnused = 0;
- }
// List unused segments
int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
@@ -304,7 +264,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
);
final KillTaskReport.Stats stats =
- new KillTaskReport.Stats(numSegmentsKilled, numBatchesProcessed, numSegmentsMarkedAsUnused);
+ new KillTaskReport.Stats(numSegmentsKilled, numBatchesProcessed);
toolbox.getTaskReportFileWriter().write(
taskId,
TaskReport.buildTaskReports(new KillTaskReport(taskId, stats))
@@ -346,4 +306,41 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
return LookupLoadingSpec.NONE;
}
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception
+ {
+ final boolean useConcurrentLocks = Boolean.TRUE.equals(
+ getContextValue(
+ Tasks.USE_CONCURRENT_LOCKS,
+ Tasks.DEFAULT_USE_CONCURRENT_LOCKS
+ )
+ );
+
+ TaskLockType actualLockType = determineLockType(useConcurrentLocks);
+
+ final TaskLock lock = taskActionClient.submit(
+ new TimeChunkLockTryAcquireAction(
+ actualLockType,
+ getInterval()
+ )
+ );
+ if (lock == null) {
+ return false;
+ }
+ lock.assertNotRevoked();
+ return true;
+ }
+
+ private TaskLockType determineLockType(boolean useConcurrentLocks)
+ {
+ TaskLockType actualLockType;
+ if (useConcurrentLocks) {
+ actualLockType = TaskLockType.REPLACE;
+ } else {
+ actualLockType = getContextValue(Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE);
+ }
+ return actualLockType;
+ }
+
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
index e6dbd0bad13..9154a0f5656 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
@@ -54,7 +54,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
- false,
99,
5,
DateTimes.nowUtc()
@@ -65,7 +64,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertNull(taskQuery.getVersions());
- Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(), fromJson.getMaxUsedStatusLastUpdatedTime());
@@ -79,7 +77,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
- true,
null,
null,
null
@@ -90,7 +87,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertNull(taskQuery.getVersions());
- Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
@@ -105,7 +101,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Intervals.of("2020-01-01/P1D"),
null,
null,
- true,
99,
null,
null
@@ -119,7 +114,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertNull(taskQuery.getVersions());
- Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
@@ -134,7 +128,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Intervals.of("2020-01-01/P1D"),
ImmutableList.of("v1", "v2"),
null,
- null,
99,
100,
DateTimes.nowUtc()
@@ -148,7 +141,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.getVersions(), taskQuery.getVersions());
- Assert.assertNull(taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(), taskQuery.getMaxUsedStatusLastUpdatedTime());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index 47c52ad2ba0..f888ace5a54 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -27,6 +27,10 @@ import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.SegmentLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
@@ -35,6 +39,8 @@ import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -116,52 +122,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
).containsExactlyInAnyOrder(segment1, segment4);
Assert.assertEquals(
- new KillTaskReport.Stats(1, 2, 0),
- getReportedStats()
- );
- }
-
- @Test
- public void testKillWithMarkUnused() throws Exception
- {
- final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4);
- final Set announced = getMetadataStorageCoordinator().commitSegments(segments, null);
- Assert.assertEquals(segments, announced);
-
- Assert.assertTrue(
- getSegmentsMetadataManager().markSegmentAsUnused(
- segment2.getId()
- )
- );
-
- final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
- .dataSource(DATA_SOURCE)
- .interval(Intervals.of("2019-03-01/2019-04-01"))
- .markAsUnused(true)
- .build();
-
- Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
-
- final List observedUnusedSegments =
- getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
- DATA_SOURCE,
- Intervals.of("2019/2020"),
- null,
- null,
- null
- );
-
- Assert.assertEquals(ImmutableList.of(segment2), observedUnusedSegments);
- Assertions.assertThat(
- getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
- DATA_SOURCE,
- Intervals.of("2019/2020"),
- Segments.ONLY_VISIBLE
- )
- ).containsExactlyInAnyOrder(segment1, segment4);
-
- Assert.assertEquals(
- new KillTaskReport.Stats(1, 2, 1),
+ new KillTaskReport.Stats(1, 2),
getReportedStats()
);
}
@@ -199,7 +160,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Assert.assertEquals(
- new KillTaskReport.Stats(4, 3, 0),
+ new KillTaskReport.Stats(4, 3),
getReportedStats()
);
@@ -247,7 +208,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Assert.assertEquals(
- new KillTaskReport.Stats(0, 1, 0),
+ new KillTaskReport.Stats(0, 1),
getReportedStats()
);
@@ -296,7 +257,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Assert.assertEquals(
- new KillTaskReport.Stats(2, 1, 0),
+ new KillTaskReport.Stats(2, 1),
getReportedStats()
);
@@ -345,7 +306,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Assert.assertEquals(
- new KillTaskReport.Stats(0, 1, 0),
+ new KillTaskReport.Stats(0, 1),
getReportedStats()
);
@@ -398,7 +359,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Assert.assertEquals(
- new KillTaskReport.Stats(0, 1, 0),
+ new KillTaskReport.Stats(0, 1),
getReportedStats()
);
@@ -419,7 +380,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2019-03-01/2019-04-01"))
- .markAsUnused(true)
.build();
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
@@ -430,7 +390,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2019-03-01/2019-04-01"))
- .markAsUnused(true)
.build();
Assert.assertEquals(LookupLoadingSpec.Mode.NONE, task.getLookupLoadingSpec().getMode());
}
@@ -472,7 +431,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(Collections.emptyList(), observedUnusedSegments);
Assert.assertEquals(
- new KillTaskReport.Stats(4, 4, 0),
+ new KillTaskReport.Stats(4, 4),
getReportedStats()
);
}
@@ -542,7 +501,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
Assert.assertEquals(
- new KillTaskReport.Stats(3, 4, 0),
+ new KillTaskReport.Stats(3, 4),
getReportedStats()
);
}
@@ -628,7 +587,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(ImmutableList.of(segment3), observedUnusedSegments);
Assert.assertEquals(
- new KillTaskReport.Stats(2, 3, 0),
+ new KillTaskReport.Stats(2, 3),
getReportedStats()
);
@@ -652,7 +611,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments2);
Assert.assertEquals(
- new KillTaskReport.Stats(1, 2, 0),
+ new KillTaskReport.Stats(1, 2),
getReportedStats()
);
}
@@ -732,7 +691,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(ImmutableList.of(segment2, segment3), observedUnusedSegments1);
Assert.assertEquals(
- new KillTaskReport.Stats(2, 3, 0),
+ new KillTaskReport.Stats(2, 3),
getReportedStats()
);
@@ -756,7 +715,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments2);
Assert.assertEquals(
- new KillTaskReport.Stats(2, 3, 0),
+ new KillTaskReport.Stats(2, 3),
getReportedStats()
);
}
@@ -813,7 +772,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode());
Assert.assertEquals(
- new KillTaskReport.Stats(2, 3, 0),
+ new KillTaskReport.Stats(2, 3),
getReportedStats()
);
@@ -838,7 +797,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode());
Assert.assertEquals(
- new KillTaskReport.Stats(1, 2, 0),
+ new KillTaskReport.Stats(1, 2),
getReportedStats()
);
@@ -861,10 +820,17 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(segments, announced);
+ for (DataSegment segment : segments) {
+ Assert.assertTrue(
+ getSegmentsMetadataManager().markSegmentAsUnused(
+ segment.getId()
+ )
+ );
+ }
+
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2018-01-01/2020-01-01"))
- .markAsUnused(true)
.batchSize(3)
.build();
@@ -880,7 +846,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(Collections.emptyList(), observedUnusedSegments);
Assert.assertEquals(
- new KillTaskReport.Stats(4, 3, 4),
+ new KillTaskReport.Stats(4, 3),
getReportedStats()
);
}
@@ -989,45 +955,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
);
}
- @Test
- public void testInvalidLimitWithMarkAsUnused()
- {
- MatcherAssert.assertThat(
- Assert.assertThrows(
- DruidException.class,
- () -> new KillUnusedSegmentsTaskBuilder()
- .dataSource(DATA_SOURCE)
- .interval(Intervals.of("2018-01-01/2020-01-01"))
- .markAsUnused(true)
- .batchSize(10)
- .limit(10)
- .build()
- ),
- DruidExceptionMatcher.invalidInput().expectMessageIs(
- "limit[10] cannot be provided when markAsUnused is enabled."
- )
- );
- }
-
- @Test
- public void testInvalidVersionsWithMarkAsUnused()
- {
- MatcherAssert.assertThat(
- Assert.assertThrows(
- DruidException.class,
- () -> new KillUnusedSegmentsTaskBuilder()
- .dataSource(DATA_SOURCE)
- .interval(Intervals.of("2018-01-01/2020-01-01"))
- .markAsUnused(true)
- .versions(ImmutableList.of("foo"))
- .build()
- ),
- DruidExceptionMatcher.invalidInput().expectMessageIs(
- "versions[[foo]] cannot be provided when markAsUnused is enabled."
- )
- );
- }
-
@Test
public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit()
{
@@ -1046,7 +973,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
{
final String taskId = "test_serde_task";
- final KillTaskReport.Stats stats = new KillTaskReport.Stats(1, 2, 3);
+ final KillTaskReport.Stats stats = new KillTaskReport.Stats(1, 2);
KillTaskReport report = new KillTaskReport(taskId, stats);
String json = getObjectMapper().writeValueAsString(report);
@@ -1059,6 +986,150 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(stats, deserializedKillReport.getPayload());
}
+
+ @Test
+ public void testIsReadyWithExclusiveLock() throws Exception
+ {
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .interval(Intervals.of("2019-03-01/2019-04-01"))
+ .build();
+
+ Capture acquireActionCapture = Capture.newInstance();
+
+ TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
+ EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture))).andReturn(
+ new SegmentLock(
+ TaskLockType.EXCLUSIVE,
+ "groupId",
+ "datasource",
+ task.getInterval(),
+ "v1",
+ 0,
+ 0
+ )
+ );
+ EasyMock.replay(taskActionClient);
+
+ Assert.assertTrue(task.isReady(taskActionClient));
+
+ Assert.assertEquals(TaskLockType.EXCLUSIVE, acquireActionCapture.getValue().getType());
+ }
+
+ @Test
+ public void testIsReadyWithReplaceLock() throws Exception
+ {
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, Boolean.TRUE))
+ .interval(Intervals.of("2019-03-01/2019-04-01"))
+ .build();
+
+ Capture acquireActionCapture = Capture.newInstance();
+
+ TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
+ EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture)))
+ .andReturn(
+ new SegmentLock(
+ TaskLockType.REPLACE,
+ "groupId",
+ "datasource",
+ task.getInterval(),
+ "v1",
+ 0,
+ 0
+ )
+ );
+ EasyMock.replay(taskActionClient);
+
+ Assert.assertTrue(task.isReady(taskActionClient));
+
+ Assert.assertEquals(TaskLockType.REPLACE, acquireActionCapture.getValue().getType());
+ }
+
+ @Test
+ public void testIsReadyWithContextAppendLock() throws Exception
+ {
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .context(ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND))
+ .interval(Intervals.of("2019-03-01/2019-04-01"))
+ .build();
+
+ Capture acquireActionCapture = Capture.newInstance();
+
+ TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
+ EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture)))
+ .andReturn(
+ new SegmentLock(
+ TaskLockType.APPEND,
+ "groupId",
+ "datasource",
+ task.getInterval(),
+ "v1",
+ 0,
+ 0
+ )
+ );
+ EasyMock.replay(taskActionClient);
+
+ Assert.assertTrue(task.isReady(taskActionClient));
+
+ Assert.assertEquals(TaskLockType.APPEND, acquireActionCapture.getValue().getType());
+ }
+
+ @Test
+ public void testIsReadyWithConcurrentLockHasPrecedenceOverContextLock() throws Exception
+ {
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, Boolean.TRUE, Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND))
+ .interval(Intervals.of("2019-03-01/2019-04-01"))
+ .build();
+
+ Capture acquireActionCapture = Capture.newInstance();
+
+ TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
+ EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture)))
+ .andReturn(
+ new SegmentLock(
+ TaskLockType.REPLACE,
+ "groupId",
+ "datasource",
+ task.getInterval(),
+ "v1",
+ 0,
+ 0
+ )
+ );
+ EasyMock.replay(taskActionClient);
+
+ Assert.assertTrue(task.isReady(taskActionClient));
+
+ Assert.assertEquals(TaskLockType.REPLACE, acquireActionCapture.getValue().getType());
+ }
+
+
+ @Test
+ public void testIsReadyReturnsNullLock() throws Exception
+ {
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, Boolean.TRUE))
+ .interval(Intervals.of("2019-03-01/2019-04-01"))
+ .build();
+
+ Capture acquireActionCapture = Capture.newInstance();
+
+ TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
+ EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture))).andReturn(null);
+ EasyMock.replay(taskActionClient);
+
+ Assert.assertFalse(task.isReady(taskActionClient));
+
+ Assert.assertEquals(TaskLockType.REPLACE, acquireActionCapture.getValue().getType());
+ }
+
private static class KillUnusedSegmentsTaskBuilder
{
private String id;
@@ -1066,7 +1137,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
private Interval interval;
private List versions;
private Map context;
- private Boolean markAsUnused;
private Integer batchSize;
private Integer limit;
private DateTime maxUsedStatusLastUpdatedTime;
@@ -1101,12 +1171,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
return this;
}
- public KillUnusedSegmentsTaskBuilder markAsUnused(Boolean markAsUnused)
- {
- this.markAsUnused = markAsUnused;
- return this;
- }
-
public KillUnusedSegmentsTaskBuilder batchSize(Integer batchSize)
{
this.batchSize = batchSize;
@@ -1133,7 +1197,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
interval,
versions,
context,
- markAsUnused,
batchSize,
limit,
maxUsedStatusLastUpdatedTime
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
index b46f3da5b1f..f42f7d4b27b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PushedSegmentsReportTest.java
@@ -31,7 +31,7 @@ public class PushedSegmentsReportTest
{
TaskReport.ReportMap map1 = new TaskReport.ReportMap();
TaskReport.ReportMap map2 = new TaskReport.ReportMap();
- map2.put("killTaskReport", new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3)));
+ map2.put("killTaskReport", new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2)));
EqualsVerifier.forClass(PushedSegmentsReport.class)
.usingGetClass()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index ece18aa852d..e94f593ccb9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -939,7 +939,6 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
Intervals.of("2011-04-01/P4D"),
null,
null,
- false,
null,
null,
null
@@ -1038,7 +1037,6 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
Intervals.of("2011-04-01/P4D"),
null,
null,
- false,
null,
maxSegmentsToKill,
null
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 41626341bae..51b0cfe742e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -926,7 +926,7 @@ public class OverlordResourceTest
auditManager
);
- Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, null, false, 10, null, null);
+ Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, null, 10, null, null);
overlordResource.taskPost(task, req);
Assert.assertTrue(auditEntryCapture.hasCaptured());
diff --git a/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
index c0736a9e1a9..a877d2bf7b1 100644
--- a/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
+++ b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexer.report;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import javax.annotation.Nullable;
import java.util.Objects;
public class KillTaskReport implements TaskReport
@@ -85,18 +84,15 @@ public class KillTaskReport implements TaskReport
{
private final int numSegmentsKilled;
private final int numBatchesProcessed;
- private final Integer numSegmentsMarkedAsUnused;
@JsonCreator
public Stats(
@JsonProperty("numSegmentsKilled") int numSegmentsKilled,
- @JsonProperty("numBatchesProcessed") int numBatchesProcessed,
- @JsonProperty("numSegmentsMarkedAsUnused") @Nullable Integer numSegmentsMarkedAsUnused
+ @JsonProperty("numBatchesProcessed") int numBatchesProcessed
)
{
this.numSegmentsKilled = numSegmentsKilled;
this.numBatchesProcessed = numBatchesProcessed;
- this.numSegmentsMarkedAsUnused = numSegmentsMarkedAsUnused;
}
@JsonProperty
@@ -111,13 +107,6 @@ public class KillTaskReport implements TaskReport
return numBatchesProcessed;
}
- @Nullable
- @JsonProperty
- public Integer getNumSegmentsMarkedAsUnused()
- {
- return numSegmentsMarkedAsUnused;
- }
-
@Override
public boolean equals(Object o)
{
@@ -129,14 +118,13 @@ public class KillTaskReport implements TaskReport
}
Stats that = (Stats) o;
return numSegmentsKilled == that.numSegmentsKilled
- && numBatchesProcessed == that.numBatchesProcessed
- && Objects.equals(this.numSegmentsMarkedAsUnused, that.numSegmentsMarkedAsUnused);
+ && numBatchesProcessed == that.numBatchesProcessed;
}
@Override
public int hashCode()
{
- return Objects.hash(numSegmentsKilled, numBatchesProcessed, numSegmentsMarkedAsUnused);
+ return Objects.hash(numSegmentsKilled, numBatchesProcessed);
}
@Override
@@ -145,7 +133,6 @@ public class KillTaskReport implements TaskReport
return "Stats{" +
"numSegmentsKilled=" + numSegmentsKilled +
", numBatchesProcessed=" + numBatchesProcessed +
- ", numSegmentsMarkedAsUnused=" + numSegmentsMarkedAsUnused +
'}';
}
}
diff --git a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
index d71964eed66..4ff36d731df 100644
--- a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java
@@ -73,7 +73,7 @@ public class TaskReportSerdeTest
@Test
public void testSerdeOfKillTaskReport() throws Exception
{
- KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3));
+ KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2));
String reportJson = jsonMapper.writeValueAsString(originalReport);
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
@@ -81,6 +81,7 @@ public class TaskReportSerdeTest
KillTaskReport deserializedReport = (KillTaskReport) deserialized;
Assert.assertEquals(originalReport, deserializedReport);
+ Assert.assertEquals(originalReport.hashCode(), deserializedReport.hashCode());
}
@Test
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
index 4dfad3c97c0..650a79a75e7 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
@@ -43,7 +43,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
private final Interval interval;
@Nullable
private final List versions;
- private final Boolean markAsUnused;
private final Integer batchSize;
@Nullable
private final Integer limit;
@@ -56,7 +55,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("versions") @Nullable List versions,
- @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
@JsonProperty("limit") @Nullable Integer limit,
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
@@ -72,7 +70,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
this.dataSource = dataSource;
this.interval = interval;
this.versions = versions;
- this.markAsUnused = markAsUnused;
this.batchSize = batchSize;
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
@@ -112,20 +109,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
return versions;
}
- /**
- * This field has been deprecated as "kill" tasks should not be responsible for
- * marking segments as unused. Instead, users should call the Coordinator API
- * {@code /{dataSourceName}/markUnused} to explicitly mark segments as unused.
- * Segments may also be marked unused by the Coordinator if they become overshadowed
- * or have a {@code DropRule} applied to them.
- */
- @Deprecated
- @JsonProperty
- public Boolean getMarkAsUnused()
- {
- return markAsUnused;
- }
-
@JsonProperty
public Integer getBatchSize()
{
@@ -161,7 +144,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
&& Objects.equals(dataSource, that.dataSource)
&& Objects.equals(interval, that.interval)
&& Objects.equals(versions, that.versions)
- && Objects.equals(markAsUnused, that.markAsUnused)
&& Objects.equals(batchSize, that.batchSize)
&& Objects.equals(limit, that.limit)
&& Objects.equals(maxUsedStatusLastUpdatedTime, that.maxUsedStatusLastUpdatedTime);
@@ -170,6 +152,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
@Override
public int hashCode()
{
- return Objects.hash(id, dataSource, interval, versions, markAsUnused, batchSize, limit, maxUsedStatusLastUpdatedTime);
+ return Objects.hash(id, dataSource, interval, versions, batchSize, limit, maxUsedStatusLastUpdatedTime);
}
}
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index 2d4dfe4aae7..310684206d2 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -109,7 +109,6 @@ public interface OverlordClient
dataSource,
interval,
versions,
- false,
null,
maxSegmentsToKill,
maxUsedStatusLastUpdatedTime
diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
index d683d59ac6e..9f9eb8075ef 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
@@ -33,7 +33,6 @@ public class ClientKillUnusedSegmentsTaskQueryTest
private static final String DATA_SOURCE = "data_source";
public static final DateTime START = DateTimes.nowUtc();
private static final Interval INTERVAL = new Interval(START, START.plus(1));
- private static final Boolean MARK_UNUSED = true;
private static final Integer BATCH_SIZE = 999;
private static final Integer LIMIT = 1000;
@@ -47,7 +46,6 @@ public class ClientKillUnusedSegmentsTaskQueryTest
DATA_SOURCE,
INTERVAL,
null,
- true,
BATCH_SIZE,
LIMIT,
null
@@ -78,12 +76,6 @@ public class ClientKillUnusedSegmentsTaskQueryTest
Assert.assertEquals(INTERVAL, clientKillUnusedSegmentsQuery.getInterval());
}
- @Test
- public void testGetMarkUnused()
- {
- Assert.assertEquals(MARK_UNUSED, clientKillUnusedSegmentsQuery.getMarkAsUnused());
- }
-
@Test
public void testGetBatchSize()
{
diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index 57bab1fed0d..8c3b867e368 100644
--- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -436,7 +436,6 @@ public class OverlordClientImplTest
null,
null,
null,
- null,
null
);