mirror of https://github.com/apache/druid.git
Allow using different lock types for kill task, remove markAsUnused parameter (#16362)
Changes: - Remove deprecated `markAsUnused` parameter from `KillUnusedSegmentsTask` - Allow `kill` task to use `REPLACE` lock when `useConcurrentLocks` is true - Use `EXCLUSIVE` lock by default
This commit is contained in:
parent
2d0b4e5f1e
commit
d0f3fdab37
|
@ -41,7 +41,7 @@ public class ControllerChatHandlerTest
|
|||
final Controller controller = Mockito.mock(Controller.class);
|
||||
|
||||
TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
|
||||
reportMap.put("killUnusedSegments", new KillTaskReport("kill_1", new KillTaskReport.Stats(1, 2, 3)));
|
||||
reportMap.put("killUnusedSegments", new KillTaskReport("kill_1", new KillTaskReport.Stats(1, 2)));
|
||||
|
||||
Mockito.when(controller.liveReports())
|
||||
.thenReturn(reportMap);
|
||||
|
|
|
@ -32,13 +32,14 @@ import org.apache.druid.indexer.TaskStatus;
|
|||
import org.apache.druid.indexer.report.KillTaskReport;
|
||||
import org.apache.druid.indexer.report.TaskReport;
|
||||
import org.apache.druid.indexing.common.TaskLock;
|
||||
import org.apache.druid.indexing.common.TaskLockType;
|
||||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
|
||||
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
|
||||
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
|
||||
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.actions.TaskLocks;
|
||||
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
|
||||
import org.apache.druid.indexing.overlord.Segments;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
@ -46,7 +47,6 @@ import org.apache.druid.java.util.common.logger.Logger;
|
|||
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
|
||||
import org.apache.druid.server.security.ResourceAction;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.utils.CollectionUtils;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -66,9 +66,6 @@ import java.util.stream.Collectors;
|
|||
* The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}.
|
||||
* JSON serialization fields of this class must correspond to those of {@link
|
||||
* ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields.
|
||||
* <p>
|
||||
* The field {@link #isMarkAsUnused()} is now deprecated.
|
||||
* </p>
|
||||
*/
|
||||
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
||||
{
|
||||
|
@ -91,8 +88,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
|||
@Nullable
|
||||
private final List<String> versions;
|
||||
|
||||
@Deprecated
|
||||
private final boolean markAsUnused;
|
||||
/**
|
||||
* Split processing to try and keep each nuke operation relatively short, in the case that either
|
||||
* the database or the storage layer is particularly slow.
|
||||
|
@ -117,7 +112,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
|||
@JsonProperty("interval") Interval interval,
|
||||
@JsonProperty("versions") @Nullable List<String> versions,
|
||||
@JsonProperty("context") Map<String, Object> context,
|
||||
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
|
||||
@JsonProperty("batchSize") Integer batchSize,
|
||||
@JsonProperty("limit") @Nullable Integer limit,
|
||||
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
|
||||
|
@ -129,7 +123,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
|||
interval,
|
||||
context
|
||||
);
|
||||
this.markAsUnused = markAsUnused != null && markAsUnused;
|
||||
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
|
||||
if (this.batchSize <= 0) {
|
||||
throw InvalidInput.exception("batchSize[%d] must be a positive integer.", batchSize);
|
||||
|
@ -137,14 +130,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
|||
if (limit != null && limit <= 0) {
|
||||
throw InvalidInput.exception("limit[%d] must be a positive integer.", limit);
|
||||
}
|
||||
if (Boolean.TRUE.equals(markAsUnused)) {
|
||||
if (limit != null) {
|
||||
throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit);
|
||||
}
|
||||
if (!CollectionUtils.isNullOrEmpty(versions)) {
|
||||
throw InvalidInput.exception("versions[%s] cannot be provided when markAsUnused is enabled.", versions);
|
||||
}
|
||||
}
|
||||
this.versions = versions;
|
||||
this.limit = limit;
|
||||
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
|
||||
|
@ -158,21 +143,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
|||
return versions;
|
||||
}
|
||||
|
||||
/**
|
||||
* This field has been deprecated as "kill" tasks should not be responsible for
|
||||
* marking segments as unused. Instead, users should call the Coordinator API
|
||||
* {@code /{dataSourceName}/markUnused} to explicitly mark segments as unused.
|
||||
* Segments may also be marked unused by the Coordinator if they become overshadowed
|
||||
* or have a {@code DropRule} applied to them.
|
||||
*/
|
||||
@Deprecated
|
||||
@JsonProperty
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
public boolean isMarkAsUnused()
|
||||
{
|
||||
return markAsUnused;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
public int getBatchSize()
|
||||
|
@ -214,16 +184,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
|||
// Track stats for reporting
|
||||
int numSegmentsKilled = 0;
|
||||
int numBatchesProcessed = 0;
|
||||
final int numSegmentsMarkedAsUnused;
|
||||
if (markAsUnused) {
|
||||
numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
|
||||
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
|
||||
);
|
||||
LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as unused.",
|
||||
numSegmentsMarkedAsUnused, getDataSource(), getInterval());
|
||||
} else {
|
||||
numSegmentsMarkedAsUnused = 0;
|
||||
}
|
||||
|
||||
// List unused segments
|
||||
int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
|
||||
|
@ -304,7 +264,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
|||
);
|
||||
|
||||
final KillTaskReport.Stats stats =
|
||||
new KillTaskReport.Stats(numSegmentsKilled, numBatchesProcessed, numSegmentsMarkedAsUnused);
|
||||
new KillTaskReport.Stats(numSegmentsKilled, numBatchesProcessed);
|
||||
toolbox.getTaskReportFileWriter().write(
|
||||
taskId,
|
||||
TaskReport.buildTaskReports(new KillTaskReport(taskId, stats))
|
||||
|
@ -346,4 +306,41 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
|||
{
|
||||
return LookupLoadingSpec.NONE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReady(TaskActionClient taskActionClient) throws Exception
|
||||
{
|
||||
final boolean useConcurrentLocks = Boolean.TRUE.equals(
|
||||
getContextValue(
|
||||
Tasks.USE_CONCURRENT_LOCKS,
|
||||
Tasks.DEFAULT_USE_CONCURRENT_LOCKS
|
||||
)
|
||||
);
|
||||
|
||||
TaskLockType actualLockType = determineLockType(useConcurrentLocks);
|
||||
|
||||
final TaskLock lock = taskActionClient.submit(
|
||||
new TimeChunkLockTryAcquireAction(
|
||||
actualLockType,
|
||||
getInterval()
|
||||
)
|
||||
);
|
||||
if (lock == null) {
|
||||
return false;
|
||||
}
|
||||
lock.assertNotRevoked();
|
||||
return true;
|
||||
}
|
||||
|
||||
private TaskLockType determineLockType(boolean useConcurrentLocks)
|
||||
{
|
||||
TaskLockType actualLockType;
|
||||
if (useConcurrentLocks) {
|
||||
actualLockType = TaskLockType.REPLACE;
|
||||
} else {
|
||||
actualLockType = getContextValue(Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE);
|
||||
}
|
||||
return actualLockType;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -54,7 +54,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
|||
"datasource",
|
||||
Intervals.of("2020-01-01/P1D"),
|
||||
null,
|
||||
false,
|
||||
99,
|
||||
5,
|
||||
DateTimes.nowUtc()
|
||||
|
@ -65,7 +64,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
|||
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
|
||||
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
|
||||
Assert.assertNull(taskQuery.getVersions());
|
||||
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
|
||||
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
|
||||
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
|
||||
Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(), fromJson.getMaxUsedStatusLastUpdatedTime());
|
||||
|
@ -79,7 +77,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
|||
"datasource",
|
||||
Intervals.of("2020-01-01/P1D"),
|
||||
null,
|
||||
true,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
|
@ -90,7 +87,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
|||
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
|
||||
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
|
||||
Assert.assertNull(taskQuery.getVersions());
|
||||
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
|
||||
Assert.assertEquals(100, fromJson.getBatchSize());
|
||||
Assert.assertNull(taskQuery.getLimit());
|
||||
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
|
||||
|
@ -105,7 +101,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
|||
Intervals.of("2020-01-01/P1D"),
|
||||
null,
|
||||
null,
|
||||
true,
|
||||
99,
|
||||
null,
|
||||
null
|
||||
|
@ -119,7 +114,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
|||
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
|
||||
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
|
||||
Assert.assertNull(taskQuery.getVersions());
|
||||
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
|
||||
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
|
||||
Assert.assertNull(taskQuery.getLimit());
|
||||
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
|
||||
|
@ -134,7 +128,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
|||
Intervals.of("2020-01-01/P1D"),
|
||||
ImmutableList.of("v1", "v2"),
|
||||
null,
|
||||
null,
|
||||
99,
|
||||
100,
|
||||
DateTimes.nowUtc()
|
||||
|
@ -148,7 +141,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
|||
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
|
||||
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
|
||||
Assert.assertEquals(task.getVersions(), taskQuery.getVersions());
|
||||
Assert.assertNull(taskQuery.getMarkAsUnused());
|
||||
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
|
||||
Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
|
||||
Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(), taskQuery.getMaxUsedStatusLastUpdatedTime());
|
||||
|
|
|
@ -27,6 +27,10 @@ import org.apache.druid.error.DruidExceptionMatcher;
|
|||
import org.apache.druid.indexer.TaskState;
|
||||
import org.apache.druid.indexer.report.KillTaskReport;
|
||||
import org.apache.druid.indexer.report.TaskReport;
|
||||
import org.apache.druid.indexing.common.SegmentLock;
|
||||
import org.apache.druid.indexing.common.TaskLockType;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
|
||||
import org.apache.druid.indexing.overlord.Segments;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
|
@ -35,6 +39,8 @@ import org.apache.druid.java.util.common.JodaUtils;
|
|||
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -116,52 +122,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
).containsExactlyInAnyOrder(segment1, segment4);
|
||||
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(1, 2, 0),
|
||||
getReportedStats()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillWithMarkUnused() throws Exception
|
||||
{
|
||||
final Set<DataSegment> segments = ImmutableSet.of(segment1, segment2, segment3, segment4);
|
||||
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments, null);
|
||||
Assert.assertEquals(segments, announced);
|
||||
|
||||
Assert.assertTrue(
|
||||
getSegmentsMetadataManager().markSegmentAsUnused(
|
||||
segment2.getId()
|
||||
)
|
||||
);
|
||||
|
||||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.interval(Intervals.of("2019-03-01/2019-04-01"))
|
||||
.markAsUnused(true)
|
||||
.build();
|
||||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||
|
||||
final List<DataSegment> observedUnusedSegments =
|
||||
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
|
||||
DATA_SOURCE,
|
||||
Intervals.of("2019/2020"),
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
Assert.assertEquals(ImmutableList.of(segment2), observedUnusedSegments);
|
||||
Assertions.assertThat(
|
||||
getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
|
||||
DATA_SOURCE,
|
||||
Intervals.of("2019/2020"),
|
||||
Segments.ONLY_VISIBLE
|
||||
)
|
||||
).containsExactlyInAnyOrder(segment1, segment4);
|
||||
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(1, 2, 1),
|
||||
new KillTaskReport.Stats(1, 2),
|
||||
getReportedStats()
|
||||
);
|
||||
}
|
||||
|
@ -199,7 +160,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(4, 3, 0),
|
||||
new KillTaskReport.Stats(4, 3),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -247,7 +208,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(0, 1, 0),
|
||||
new KillTaskReport.Stats(0, 1),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -296,7 +257,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(2, 1, 0),
|
||||
new KillTaskReport.Stats(2, 1),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -345,7 +306,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(0, 1, 0),
|
||||
new KillTaskReport.Stats(0, 1),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -398,7 +359,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(0, 1, 0),
|
||||
new KillTaskReport.Stats(0, 1),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -419,7 +380,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.interval(Intervals.of("2019-03-01/2019-04-01"))
|
||||
.markAsUnused(true)
|
||||
.build();
|
||||
Assert.assertTrue(task.getInputSourceResources().isEmpty());
|
||||
}
|
||||
|
@ -430,7 +390,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.interval(Intervals.of("2019-03-01/2019-04-01"))
|
||||
.markAsUnused(true)
|
||||
.build();
|
||||
Assert.assertEquals(LookupLoadingSpec.Mode.NONE, task.getLookupLoadingSpec().getMode());
|
||||
}
|
||||
|
@ -472,7 +431,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(Collections.emptyList(), observedUnusedSegments);
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(4, 4, 0),
|
||||
new KillTaskReport.Stats(4, 4),
|
||||
getReportedStats()
|
||||
);
|
||||
}
|
||||
|
@ -542,7 +501,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(3, 4, 0),
|
||||
new KillTaskReport.Stats(3, 4),
|
||||
getReportedStats()
|
||||
);
|
||||
}
|
||||
|
@ -628,7 +587,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(ImmutableList.of(segment3), observedUnusedSegments);
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(2, 3, 0),
|
||||
new KillTaskReport.Stats(2, 3),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -652,7 +611,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments2);
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(1, 2, 0),
|
||||
new KillTaskReport.Stats(1, 2),
|
||||
getReportedStats()
|
||||
);
|
||||
}
|
||||
|
@ -732,7 +691,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(ImmutableList.of(segment2, segment3), observedUnusedSegments1);
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(2, 3, 0),
|
||||
new KillTaskReport.Stats(2, 3),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -756,7 +715,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(ImmutableList.of(), observedUnusedSegments2);
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(2, 3, 0),
|
||||
new KillTaskReport.Stats(2, 3),
|
||||
getReportedStats()
|
||||
);
|
||||
}
|
||||
|
@ -813,7 +772,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode());
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(2, 3, 0),
|
||||
new KillTaskReport.Stats(2, 3),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -838,7 +797,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode());
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(1, 2, 0),
|
||||
new KillTaskReport.Stats(1, 2),
|
||||
getReportedStats()
|
||||
);
|
||||
|
||||
|
@ -861,10 +820,17 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(segments, announced);
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
Assert.assertTrue(
|
||||
getSegmentsMetadataManager().markSegmentAsUnused(
|
||||
segment.getId()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.interval(Intervals.of("2018-01-01/2020-01-01"))
|
||||
.markAsUnused(true)
|
||||
.batchSize(3)
|
||||
.build();
|
||||
|
||||
|
@ -880,7 +846,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
|
||||
Assert.assertEquals(Collections.emptyList(), observedUnusedSegments);
|
||||
Assert.assertEquals(
|
||||
new KillTaskReport.Stats(4, 3, 4),
|
||||
new KillTaskReport.Stats(4, 3),
|
||||
getReportedStats()
|
||||
);
|
||||
}
|
||||
|
@ -989,45 +955,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidLimitWithMarkAsUnused()
|
||||
{
|
||||
MatcherAssert.assertThat(
|
||||
Assert.assertThrows(
|
||||
DruidException.class,
|
||||
() -> new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.interval(Intervals.of("2018-01-01/2020-01-01"))
|
||||
.markAsUnused(true)
|
||||
.batchSize(10)
|
||||
.limit(10)
|
||||
.build()
|
||||
),
|
||||
DruidExceptionMatcher.invalidInput().expectMessageIs(
|
||||
"limit[10] cannot be provided when markAsUnused is enabled."
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidVersionsWithMarkAsUnused()
|
||||
{
|
||||
MatcherAssert.assertThat(
|
||||
Assert.assertThrows(
|
||||
DruidException.class,
|
||||
() -> new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.interval(Intervals.of("2018-01-01/2020-01-01"))
|
||||
.markAsUnused(true)
|
||||
.versions(ImmutableList.of("foo"))
|
||||
.build()
|
||||
),
|
||||
DruidExceptionMatcher.invalidInput().expectMessageIs(
|
||||
"versions[[foo]] cannot be provided when markAsUnused is enabled."
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit()
|
||||
{
|
||||
|
@ -1046,7 +973,7 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
{
|
||||
final String taskId = "test_serde_task";
|
||||
|
||||
final KillTaskReport.Stats stats = new KillTaskReport.Stats(1, 2, 3);
|
||||
final KillTaskReport.Stats stats = new KillTaskReport.Stats(1, 2);
|
||||
KillTaskReport report = new KillTaskReport(taskId, stats);
|
||||
|
||||
String json = getObjectMapper().writeValueAsString(report);
|
||||
|
@ -1059,6 +986,150 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
Assert.assertEquals(stats, deserializedKillReport.getPayload());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testIsReadyWithExclusiveLock() throws Exception
|
||||
{
|
||||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.interval(Intervals.of("2019-03-01/2019-04-01"))
|
||||
.build();
|
||||
|
||||
Capture<TimeChunkLockTryAcquireAction> acquireActionCapture = Capture.newInstance();
|
||||
|
||||
TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
|
||||
EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture))).andReturn(
|
||||
new SegmentLock(
|
||||
TaskLockType.EXCLUSIVE,
|
||||
"groupId",
|
||||
"datasource",
|
||||
task.getInterval(),
|
||||
"v1",
|
||||
0,
|
||||
0
|
||||
)
|
||||
);
|
||||
EasyMock.replay(taskActionClient);
|
||||
|
||||
Assert.assertTrue(task.isReady(taskActionClient));
|
||||
|
||||
Assert.assertEquals(TaskLockType.EXCLUSIVE, acquireActionCapture.getValue().getType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsReadyWithReplaceLock() throws Exception
|
||||
{
|
||||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, Boolean.TRUE))
|
||||
.interval(Intervals.of("2019-03-01/2019-04-01"))
|
||||
.build();
|
||||
|
||||
Capture<TimeChunkLockTryAcquireAction> acquireActionCapture = Capture.newInstance();
|
||||
|
||||
TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
|
||||
EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture)))
|
||||
.andReturn(
|
||||
new SegmentLock(
|
||||
TaskLockType.REPLACE,
|
||||
"groupId",
|
||||
"datasource",
|
||||
task.getInterval(),
|
||||
"v1",
|
||||
0,
|
||||
0
|
||||
)
|
||||
);
|
||||
EasyMock.replay(taskActionClient);
|
||||
|
||||
Assert.assertTrue(task.isReady(taskActionClient));
|
||||
|
||||
Assert.assertEquals(TaskLockType.REPLACE, acquireActionCapture.getValue().getType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsReadyWithContextAppendLock() throws Exception
|
||||
{
|
||||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.context(ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND))
|
||||
.interval(Intervals.of("2019-03-01/2019-04-01"))
|
||||
.build();
|
||||
|
||||
Capture<TimeChunkLockTryAcquireAction> acquireActionCapture = Capture.newInstance();
|
||||
|
||||
TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
|
||||
EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture)))
|
||||
.andReturn(
|
||||
new SegmentLock(
|
||||
TaskLockType.APPEND,
|
||||
"groupId",
|
||||
"datasource",
|
||||
task.getInterval(),
|
||||
"v1",
|
||||
0,
|
||||
0
|
||||
)
|
||||
);
|
||||
EasyMock.replay(taskActionClient);
|
||||
|
||||
Assert.assertTrue(task.isReady(taskActionClient));
|
||||
|
||||
Assert.assertEquals(TaskLockType.APPEND, acquireActionCapture.getValue().getType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsReadyWithConcurrentLockHasPrecedenceOverContextLock() throws Exception
|
||||
{
|
||||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, Boolean.TRUE, Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND))
|
||||
.interval(Intervals.of("2019-03-01/2019-04-01"))
|
||||
.build();
|
||||
|
||||
Capture<TimeChunkLockTryAcquireAction> acquireActionCapture = Capture.newInstance();
|
||||
|
||||
TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
|
||||
EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture)))
|
||||
.andReturn(
|
||||
new SegmentLock(
|
||||
TaskLockType.REPLACE,
|
||||
"groupId",
|
||||
"datasource",
|
||||
task.getInterval(),
|
||||
"v1",
|
||||
0,
|
||||
0
|
||||
)
|
||||
);
|
||||
EasyMock.replay(taskActionClient);
|
||||
|
||||
Assert.assertTrue(task.isReady(taskActionClient));
|
||||
|
||||
Assert.assertEquals(TaskLockType.REPLACE, acquireActionCapture.getValue().getType());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testIsReadyReturnsNullLock() throws Exception
|
||||
{
|
||||
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
|
||||
.dataSource(DATA_SOURCE)
|
||||
.context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, Boolean.TRUE))
|
||||
.interval(Intervals.of("2019-03-01/2019-04-01"))
|
||||
.build();
|
||||
|
||||
Capture<TimeChunkLockTryAcquireAction> acquireActionCapture = Capture.newInstance();
|
||||
|
||||
TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class);
|
||||
EasyMock.expect(taskActionClient.submit(EasyMock.capture(acquireActionCapture))).andReturn(null);
|
||||
EasyMock.replay(taskActionClient);
|
||||
|
||||
Assert.assertFalse(task.isReady(taskActionClient));
|
||||
|
||||
Assert.assertEquals(TaskLockType.REPLACE, acquireActionCapture.getValue().getType());
|
||||
}
|
||||
|
||||
private static class KillUnusedSegmentsTaskBuilder
|
||||
{
|
||||
private String id;
|
||||
|
@ -1066,7 +1137,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
private Interval interval;
|
||||
private List<String> versions;
|
||||
private Map<String, Object> context;
|
||||
private Boolean markAsUnused;
|
||||
private Integer batchSize;
|
||||
private Integer limit;
|
||||
private DateTime maxUsedStatusLastUpdatedTime;
|
||||
|
@ -1101,12 +1171,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
return this;
|
||||
}
|
||||
|
||||
public KillUnusedSegmentsTaskBuilder markAsUnused(Boolean markAsUnused)
|
||||
{
|
||||
this.markAsUnused = markAsUnused;
|
||||
return this;
|
||||
}
|
||||
|
||||
public KillUnusedSegmentsTaskBuilder batchSize(Integer batchSize)
|
||||
{
|
||||
this.batchSize = batchSize;
|
||||
|
@ -1133,7 +1197,6 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
|||
interval,
|
||||
versions,
|
||||
context,
|
||||
markAsUnused,
|
||||
batchSize,
|
||||
limit,
|
||||
maxUsedStatusLastUpdatedTime
|
||||
|
|
|
@ -31,7 +31,7 @@ public class PushedSegmentsReportTest
|
|||
{
|
||||
TaskReport.ReportMap map1 = new TaskReport.ReportMap();
|
||||
TaskReport.ReportMap map2 = new TaskReport.ReportMap();
|
||||
map2.put("killTaskReport", new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3)));
|
||||
map2.put("killTaskReport", new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2)));
|
||||
|
||||
EqualsVerifier.forClass(PushedSegmentsReport.class)
|
||||
.usingGetClass()
|
||||
|
|
|
@ -939,7 +939,6 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
|||
Intervals.of("2011-04-01/P4D"),
|
||||
null,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
|
@ -1038,7 +1037,6 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
|||
Intervals.of("2011-04-01/P4D"),
|
||||
null,
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
maxSegmentsToKill,
|
||||
null
|
||||
|
|
|
@ -926,7 +926,7 @@ public class OverlordResourceTest
|
|||
auditManager
|
||||
);
|
||||
|
||||
Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, null, false, 10, null, null);
|
||||
Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, null, 10, null, null);
|
||||
overlordResource.taskPost(task, req);
|
||||
|
||||
Assert.assertTrue(auditEntryCapture.hasCaptured());
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.druid.indexer.report;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Objects;
|
||||
|
||||
public class KillTaskReport implements TaskReport
|
||||
|
@ -85,18 +84,15 @@ public class KillTaskReport implements TaskReport
|
|||
{
|
||||
private final int numSegmentsKilled;
|
||||
private final int numBatchesProcessed;
|
||||
private final Integer numSegmentsMarkedAsUnused;
|
||||
|
||||
@JsonCreator
|
||||
public Stats(
|
||||
@JsonProperty("numSegmentsKilled") int numSegmentsKilled,
|
||||
@JsonProperty("numBatchesProcessed") int numBatchesProcessed,
|
||||
@JsonProperty("numSegmentsMarkedAsUnused") @Nullable Integer numSegmentsMarkedAsUnused
|
||||
@JsonProperty("numBatchesProcessed") int numBatchesProcessed
|
||||
)
|
||||
{
|
||||
this.numSegmentsKilled = numSegmentsKilled;
|
||||
this.numBatchesProcessed = numBatchesProcessed;
|
||||
this.numSegmentsMarkedAsUnused = numSegmentsMarkedAsUnused;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -111,13 +107,6 @@ public class KillTaskReport implements TaskReport
|
|||
return numBatchesProcessed;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@JsonProperty
|
||||
public Integer getNumSegmentsMarkedAsUnused()
|
||||
{
|
||||
return numSegmentsMarkedAsUnused;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -129,14 +118,13 @@ public class KillTaskReport implements TaskReport
|
|||
}
|
||||
Stats that = (Stats) o;
|
||||
return numSegmentsKilled == that.numSegmentsKilled
|
||||
&& numBatchesProcessed == that.numBatchesProcessed
|
||||
&& Objects.equals(this.numSegmentsMarkedAsUnused, that.numSegmentsMarkedAsUnused);
|
||||
&& numBatchesProcessed == that.numBatchesProcessed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(numSegmentsKilled, numBatchesProcessed, numSegmentsMarkedAsUnused);
|
||||
return Objects.hash(numSegmentsKilled, numBatchesProcessed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -145,7 +133,6 @@ public class KillTaskReport implements TaskReport
|
|||
return "Stats{" +
|
||||
"numSegmentsKilled=" + numSegmentsKilled +
|
||||
", numBatchesProcessed=" + numBatchesProcessed +
|
||||
", numSegmentsMarkedAsUnused=" + numSegmentsMarkedAsUnused +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ public class TaskReportSerdeTest
|
|||
@Test
|
||||
public void testSerdeOfKillTaskReport() throws Exception
|
||||
{
|
||||
KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2, 3));
|
||||
KillTaskReport originalReport = new KillTaskReport("taskId", new KillTaskReport.Stats(1, 2));
|
||||
String reportJson = jsonMapper.writeValueAsString(originalReport);
|
||||
TaskReport deserialized = jsonMapper.readValue(reportJson, TaskReport.class);
|
||||
|
||||
|
@ -81,6 +81,7 @@ public class TaskReportSerdeTest
|
|||
|
||||
KillTaskReport deserializedReport = (KillTaskReport) deserialized;
|
||||
Assert.assertEquals(originalReport, deserializedReport);
|
||||
Assert.assertEquals(originalReport.hashCode(), deserializedReport.hashCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -43,7 +43,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
|||
private final Interval interval;
|
||||
@Nullable
|
||||
private final List<String> versions;
|
||||
private final Boolean markAsUnused;
|
||||
private final Integer batchSize;
|
||||
@Nullable
|
||||
private final Integer limit;
|
||||
|
@ -56,7 +55,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
|||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval,
|
||||
@JsonProperty("versions") @Nullable List<String> versions,
|
||||
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
|
||||
@JsonProperty("batchSize") Integer batchSize,
|
||||
@JsonProperty("limit") @Nullable Integer limit,
|
||||
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
|
||||
|
@ -72,7 +70,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
|||
this.dataSource = dataSource;
|
||||
this.interval = interval;
|
||||
this.versions = versions;
|
||||
this.markAsUnused = markAsUnused;
|
||||
this.batchSize = batchSize;
|
||||
this.limit = limit;
|
||||
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
|
||||
|
@ -112,20 +109,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
|||
return versions;
|
||||
}
|
||||
|
||||
/**
|
||||
* This field has been deprecated as "kill" tasks should not be responsible for
|
||||
* marking segments as unused. Instead, users should call the Coordinator API
|
||||
* {@code /{dataSourceName}/markUnused} to explicitly mark segments as unused.
|
||||
* Segments may also be marked unused by the Coordinator if they become overshadowed
|
||||
* or have a {@code DropRule} applied to them.
|
||||
*/
|
||||
@Deprecated
|
||||
@JsonProperty
|
||||
public Boolean getMarkAsUnused()
|
||||
{
|
||||
return markAsUnused;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Integer getBatchSize()
|
||||
{
|
||||
|
@ -161,7 +144,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
|||
&& Objects.equals(dataSource, that.dataSource)
|
||||
&& Objects.equals(interval, that.interval)
|
||||
&& Objects.equals(versions, that.versions)
|
||||
&& Objects.equals(markAsUnused, that.markAsUnused)
|
||||
&& Objects.equals(batchSize, that.batchSize)
|
||||
&& Objects.equals(limit, that.limit)
|
||||
&& Objects.equals(maxUsedStatusLastUpdatedTime, that.maxUsedStatusLastUpdatedTime);
|
||||
|
@ -170,6 +152,6 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
|||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(id, dataSource, interval, versions, markAsUnused, batchSize, limit, maxUsedStatusLastUpdatedTime);
|
||||
return Objects.hash(id, dataSource, interval, versions, batchSize, limit, maxUsedStatusLastUpdatedTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,7 +109,6 @@ public interface OverlordClient
|
|||
dataSource,
|
||||
interval,
|
||||
versions,
|
||||
false,
|
||||
null,
|
||||
maxSegmentsToKill,
|
||||
maxUsedStatusLastUpdatedTime
|
||||
|
|
|
@ -33,7 +33,6 @@ public class ClientKillUnusedSegmentsTaskQueryTest
|
|||
private static final String DATA_SOURCE = "data_source";
|
||||
public static final DateTime START = DateTimes.nowUtc();
|
||||
private static final Interval INTERVAL = new Interval(START, START.plus(1));
|
||||
private static final Boolean MARK_UNUSED = true;
|
||||
private static final Integer BATCH_SIZE = 999;
|
||||
private static final Integer LIMIT = 1000;
|
||||
|
||||
|
@ -47,7 +46,6 @@ public class ClientKillUnusedSegmentsTaskQueryTest
|
|||
DATA_SOURCE,
|
||||
INTERVAL,
|
||||
null,
|
||||
true,
|
||||
BATCH_SIZE,
|
||||
LIMIT,
|
||||
null
|
||||
|
@ -78,12 +76,6 @@ public class ClientKillUnusedSegmentsTaskQueryTest
|
|||
Assert.assertEquals(INTERVAL, clientKillUnusedSegmentsQuery.getInterval());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMarkUnused()
|
||||
{
|
||||
Assert.assertEquals(MARK_UNUSED, clientKillUnusedSegmentsQuery.getMarkAsUnused());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBatchSize()
|
||||
{
|
||||
|
|
|
@ -436,7 +436,6 @@ public class OverlordClientImplTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue