mirror of
https://github.com/apache/druid.git
synced 2025-02-25 04:16:07 +00:00
Allow kill task to mark segments as unused (#11501)
* Allow kill task to mark segments as unused * Add IndexerSQLMetadataStorageCoordinator test * Update docs/ingestion/data-management.md Co-authored-by: Jihoon Son <jihoonson@apache.org> * Add warning to kill task doc Co-authored-by: Jihoon Son <jihoonson@apache.org>
This commit is contained in:
parent
280c08045f
commit
9b250c54aa
@ -95,7 +95,9 @@ A data deletion tutorial is available at [Tutorial: Deleting data](../tutorials/
|
||||
|
||||
## Kill Task
|
||||
|
||||
Kill tasks delete all information about a segment and removes it from deep storage. Segments to kill must be unused (used==0) in the Druid segment table. The available grammar is:
|
||||
The kill task deletes all information about segments and removes them from deep storage. Segments to kill must be unused (used==0) in the Druid segment table.
|
||||
|
||||
The available grammar is:
|
||||
|
||||
```json
|
||||
{
|
||||
@ -103,10 +105,15 @@ Kill tasks delete all information about a segment and removes it from deep stora
|
||||
"id": <task_id>,
|
||||
"dataSource": <task_datasource>,
|
||||
"interval" : <all_segments_in_this_interval_will_die!>,
|
||||
"markAsUnused": <true|false>,
|
||||
"context": <task context>
|
||||
}
|
||||
```
|
||||
|
||||
If `markAsUnused` is true (default is false), the kill task will first mark any segments within the specified interval as unused, before deleting the unused segments within the interval.
|
||||
|
||||
**WARNING!** The kill task permanently removes all information about the affected segments from the metadata store and deep storage. These segments cannot be recovered after the kill task runs, this operation cannot be undone.
|
||||
|
||||
## Retention
|
||||
|
||||
Druid supports retention rules, which are used to define intervals of time where data should be preserved, and intervals where data should be discarded.
|
||||
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.common.actions;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import org.apache.druid.indexing.common.task.Task;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
public class MarkSegmentsAsUnusedAction implements TaskAction<Integer>
|
||||
{
|
||||
@JsonIgnore
|
||||
private final String dataSource;
|
||||
|
||||
@JsonIgnore
|
||||
private final Interval interval;
|
||||
|
||||
@JsonCreator
|
||||
public MarkSegmentsAsUnusedAction(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval
|
||||
)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Interval getInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeReference<Integer> getReturnTypeReference()
|
||||
{
|
||||
return new TypeReference<Integer>()
|
||||
{
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer perform(Task task, TaskActionToolbox toolbox)
|
||||
{
|
||||
int numMarked = toolbox.getIndexerMetadataStorageCoordinator()
|
||||
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
|
||||
return numMarked;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAudited()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
@ -38,6 +38,7 @@ import org.apache.druid.indexing.common.task.Task;
|
||||
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
|
||||
// Type name doesn't correspond to the name of the class for backward compatibility.
|
||||
@JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class),
|
||||
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class),
|
||||
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
|
||||
@JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
|
||||
@JsonSubTypes.Type(name = SegmentAllocateAction.TYPE, value = SegmentAllocateAction.class),
|
||||
|
@ -25,11 +25,13 @@ import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
|
||||
import org.apache.druid.indexer.TaskStatus;
|
||||
import org.apache.druid.indexing.common.TaskLock;
|
||||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
|
||||
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
|
||||
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.actions.TaskLocks;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
@ -50,13 +52,17 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
||||
{
|
||||
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
|
||||
|
||||
private final boolean markAsUnused;
|
||||
|
||||
@JsonCreator
|
||||
public KillUnusedSegmentsTask(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval,
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
@JsonProperty("context") Map<String, Object> context,
|
||||
@JsonProperty("markAsUnused") Boolean markAsUnused
|
||||
)
|
||||
{
|
||||
super(
|
||||
@ -65,6 +71,13 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
||||
interval,
|
||||
context
|
||||
);
|
||||
this.markAsUnused = markAsUnused != null && markAsUnused;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isMarkAsUnused()
|
||||
{
|
||||
return markAsUnused;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -77,6 +90,14 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient());
|
||||
|
||||
if (markAsUnused) {
|
||||
int numMarked = toolbox.getTaskActionClient().submit(
|
||||
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
|
||||
);
|
||||
LOG.info("Marked %d segments as unused.", numMarked);
|
||||
}
|
||||
|
||||
// List unused segments
|
||||
final List<DataSegment> unusedSegments = toolbox
|
||||
.getTaskActionClient()
|
||||
|
@ -50,13 +50,15 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
||||
final ClientKillUnusedSegmentsTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(
|
||||
"killTaskId",
|
||||
"datasource",
|
||||
Intervals.of("2020-01-01/P1D")
|
||||
Intervals.of("2020-01-01/P1D"),
|
||||
true
|
||||
);
|
||||
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
|
||||
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
|
||||
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
|
||||
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
|
||||
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
|
||||
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -66,7 +68,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
||||
null,
|
||||
"datasource",
|
||||
Intervals.of("2020-01-01/P1D"),
|
||||
null
|
||||
null,
|
||||
true
|
||||
);
|
||||
final byte[] json = objectMapper.writeValueAsBytes(task);
|
||||
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
|
||||
@ -76,5 +79,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
|
||||
Assert.assertEquals(task.getId(), taskQuery.getId());
|
||||
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
|
||||
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
|
||||
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
|
||||
}
|
||||
}
|
||||
|
@ -74,7 +74,58 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
||||
);
|
||||
|
||||
final KillUnusedSegmentsTask task =
|
||||
new KillUnusedSegmentsTask(null, DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null);
|
||||
new KillUnusedSegmentsTask(
|
||||
null,
|
||||
DATA_SOURCE,
|
||||
Intervals.of("2019-03-01/2019-04-01"),
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||
|
||||
final List<DataSegment> unusedSegments =
|
||||
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));
|
||||
|
||||
Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments);
|
||||
Assertions.assertThat(
|
||||
getMetadataStorageCoordinator()
|
||||
.retrieveUsedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"), Segments.ONLY_VISIBLE)
|
||||
).containsExactlyInAnyOrder(
|
||||
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
|
||||
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testKillWithMarkUnused() throws Exception
|
||||
{
|
||||
final String version = DateTimes.nowUtc().toString();
|
||||
final Set<DataSegment> segments = ImmutableSet.of(
|
||||
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
|
||||
newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
|
||||
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
|
||||
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
|
||||
);
|
||||
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
|
||||
|
||||
Assert.assertEquals(segments, announced);
|
||||
|
||||
Assert.assertTrue(
|
||||
getSegmentsMetadataManager().markSegmentAsUnused(
|
||||
newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId().toString()
|
||||
)
|
||||
);
|
||||
|
||||
final KillUnusedSegmentsTask task =
|
||||
new KillUnusedSegmentsTask(
|
||||
null,
|
||||
DATA_SOURCE,
|
||||
Intervals.of("2019-03-01/2019-04-01"),
|
||||
null,
|
||||
true
|
||||
);
|
||||
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||
|
||||
|
@ -929,7 +929,13 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
||||
}
|
||||
|
||||
final Task killUnusedSegmentsTask =
|
||||
new KillUnusedSegmentsTask(null, "test_kill_task", Intervals.of("2011-04-01/P4D"), null);
|
||||
new KillUnusedSegmentsTask(
|
||||
null,
|
||||
"test_kill_task",
|
||||
Intervals.of("2011-04-01/P4D"),
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
||||
final TaskStatus status = runTask(killUnusedSegmentsTask);
|
||||
Assert.assertEquals(taskLocation, status.getLocation());
|
||||
|
@ -106,6 +106,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
|
||||
{
|
||||
|
@ -38,17 +38,20 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
||||
private final String id;
|
||||
private final String dataSource;
|
||||
private final Interval interval;
|
||||
private final Boolean markAsUnused;
|
||||
|
||||
@JsonCreator
|
||||
public ClientKillUnusedSegmentsTaskQuery(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval
|
||||
@JsonProperty("interval") Interval interval,
|
||||
@JsonProperty("markAsUnused") Boolean markAsUnused
|
||||
)
|
||||
{
|
||||
this.id = Preconditions.checkNotNull(id, "id");
|
||||
this.dataSource = dataSource;
|
||||
this.interval = interval;
|
||||
this.markAsUnused = markAsUnused;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@ -78,6 +81,12 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
||||
return interval;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Boolean getMarkAsUnused()
|
||||
{
|
||||
return markAsUnused;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
@ -88,14 +97,15 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
|
||||
return false;
|
||||
}
|
||||
ClientKillUnusedSegmentsTaskQuery that = (ClientKillUnusedSegmentsTaskQuery) o;
|
||||
return Objects.equals(id, that.id) &&
|
||||
Objects.equals(dataSource, that.dataSource) &&
|
||||
Objects.equals(interval, that.interval);
|
||||
return Objects.equals(id, that.id)
|
||||
&& Objects.equals(dataSource, that.dataSource)
|
||||
&& Objects.equals(interval, that.interval)
|
||||
&& Objects.equals(markAsUnused, that.markAsUnused);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(id, dataSource, interval);
|
||||
return Objects.hash(id, dataSource, interval, markAsUnused);
|
||||
}
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
|
||||
public void killUnusedSegments(String idPrefix, String dataSource, Interval interval)
|
||||
{
|
||||
final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
|
||||
final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval);
|
||||
final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false);
|
||||
runTask(taskId, taskQuery);
|
||||
}
|
||||
|
||||
|
@ -134,6 +134,16 @@ public interface IndexerMetadataStorageCoordinator
|
||||
*/
|
||||
List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, Interval interval);
|
||||
|
||||
/**
|
||||
* Mark as unused segments which include ONLY data within the given interval.
|
||||
*
|
||||
* @param dataSource The data source the segments belong to
|
||||
* @param interval Filter the data segments to ones that include data in this interval exclusively.
|
||||
*
|
||||
* @return number of segments marked unused
|
||||
*/
|
||||
int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval);
|
||||
|
||||
/**
|
||||
* Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments
|
||||
* with identifiers already in the metadata storage will not be added).
|
||||
|
@ -226,6 +226,33 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
return matchingSegments;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval)
|
||||
{
|
||||
int numSegmentsMarkedUnused = connector.retryTransaction(
|
||||
(handle, status) -> {
|
||||
return handle
|
||||
.createStatement(
|
||||
StringUtils.format(
|
||||
"UPDATE %s SET used=false WHERE dataSource = :dataSource "
|
||||
+ "AND start >= :start AND %2$send%2$s <= :end",
|
||||
dbTables.getSegmentsTable(),
|
||||
connector.getQuoteString()
|
||||
)
|
||||
)
|
||||
.bind("dataSource", dataSource)
|
||||
.bind("start", interval.getStart().toString())
|
||||
.bind("end", interval.getEnd().toString())
|
||||
.execute();
|
||||
},
|
||||
3,
|
||||
SQLMetadataConnector.DEFAULT_MAX_TRIES
|
||||
);
|
||||
|
||||
log.info("Marked %,d segments unused for %s for interval %s.", numSegmentsMarkedUnused, dataSource, interval);
|
||||
return numSegmentsMarkedUnused;
|
||||
}
|
||||
|
||||
private List<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
|
||||
final Handle handle,
|
||||
final String dataSource,
|
||||
|
@ -33,13 +33,14 @@ public class ClientKillUnusedSegmentsTaskQueryTest
|
||||
private static final String DATA_SOURCE = "data_source";
|
||||
public static final DateTime START = DateTimes.nowUtc();
|
||||
private static final Interval INTERVAL = new Interval(START, START.plus(1));
|
||||
private static final Boolean MARK_UNUSED = true;
|
||||
|
||||
ClientKillUnusedSegmentsTaskQuery clientKillUnusedSegmentsQuery;
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL);
|
||||
clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true);
|
||||
}
|
||||
|
||||
@After
|
||||
@ -66,6 +67,12 @@ public class ClientKillUnusedSegmentsTaskQueryTest
|
||||
Assert.assertEquals(INTERVAL, clientKillUnusedSegmentsQuery.getInterval());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMarkUnused()
|
||||
{
|
||||
Assert.assertEquals(MARK_UNUSED, clientKillUnusedSegmentsQuery.getMarkAsUnused());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEquals()
|
||||
{
|
||||
|
@ -1549,4 +1549,36 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
||||
);
|
||||
Assert.assertEquals(0, deletedCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMarkSegmentsAsUnusedWithinInterval() throws IOException
|
||||
{
|
||||
coordinator.announceHistoricalSegments(ImmutableSet.of(existingSegment1, existingSegment2));
|
||||
|
||||
// interval covers existingSegment1 and partially overlaps existingSegment2,
|
||||
// only existingSegment1 will be dropped
|
||||
coordinator.markSegmentsAsUnusedWithinInterval(
|
||||
existingSegment1.getDataSource(),
|
||||
Intervals.of("1993-12-31T12Z/1994-01-02T12Z")
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableSet.of(existingSegment1),
|
||||
ImmutableSet.copyOf(
|
||||
coordinator.retrieveUnusedSegmentsForInterval(
|
||||
existingSegment1.getDataSource(),
|
||||
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1))
|
||||
)
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableSet.of(),
|
||||
ImmutableSet.copyOf(
|
||||
coordinator.retrieveUnusedSegmentsForInterval(
|
||||
existingSegment2.getDataSource(),
|
||||
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1))
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user