mirror of https://github.com/apache/druid.git
Fix bug where numSegmentsKilled is reported incorrectly. Also, add a unit test. (#16103)
This commit is contained in:
parent
8ef3eebd30
commit
0a615f16de
|
@ -273,7 +273,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
|
||||||
|
|
||||||
toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
|
toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
|
||||||
numBatchesProcessed++;
|
numBatchesProcessed++;
|
||||||
numSegmentsKilled += unusedSegments.size();
|
numSegmentsKilled += segmentsToBeKilled.size();
|
||||||
|
|
||||||
LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId());
|
LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId());
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.druid.error.DruidException;
|
import org.apache.druid.error.DruidException;
|
||||||
import org.apache.druid.error.DruidExceptionMatcher;
|
import org.apache.druid.error.DruidExceptionMatcher;
|
||||||
|
@ -42,6 +43,7 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -118,6 +120,62 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
||||||
Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats());
|
Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@code segment1}, {@code segment2} and {@code segment3} have different versions, but share the same load spec.
|
||||||
|
* {@code segment1} and {@code segment2} are unused segments, while {@code segment3} is a used segment.
|
||||||
|
* When a kill task is submitted, the unused segments {@code segment1} and {@code segment2} should be deleted from the
|
||||||
|
* metadata store, but should be retained in deep storage as the load spec is used by segment {@code segment3}.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception
|
||||||
|
{
|
||||||
|
final DateTime now = DateTimes.nowUtc();
|
||||||
|
final String v1 = now.toString();
|
||||||
|
final String v2 = now.minusHours(2).toString();
|
||||||
|
final String v3 = now.minusHours(3).toString();
|
||||||
|
|
||||||
|
final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1, ImmutableMap.of("foo", "1"));
|
||||||
|
final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v2, ImmutableMap.of("foo", "1"));
|
||||||
|
final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v3, ImmutableMap.of("foo", "1"));
|
||||||
|
|
||||||
|
final Set<DataSegment> segments = ImmutableSet.of(segment1, segment2, segment3);
|
||||||
|
final Set<DataSegment> unusedSegments = ImmutableSet.of(segment1, segment2);
|
||||||
|
|
||||||
|
Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments));
|
||||||
|
Assert.assertEquals(
|
||||||
|
unusedSegments.size(),
|
||||||
|
getSegmentsMetadataManager().markSegmentsAsUnused(
|
||||||
|
unusedSegments.stream().map(DataSegment::getId).collect(Collectors.toSet())
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask(
|
||||||
|
null,
|
||||||
|
DATA_SOURCE,
|
||||||
|
Intervals.of("2018/2020"),
|
||||||
|
null,
|
||||||
|
false,
|
||||||
|
null,
|
||||||
|
100,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
|
||||||
|
Assert.assertEquals(
|
||||||
|
new KillTaskReport.Stats(0, 1, 0),
|
||||||
|
getReportedStats()
|
||||||
|
);
|
||||||
|
|
||||||
|
final List<DataSegment> observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
|
||||||
|
DATA_SOURCE,
|
||||||
|
Intervals.of("2018/2020"),
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(ImmutableSet.of(), new HashSet<>(observedUnusedSegments));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testKillWithMarkUnused() throws Exception
|
public void testKillWithMarkUnused() throws Exception
|
||||||
|
@ -823,4 +881,19 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
|
||||||
10L
|
10L
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static DataSegment newSegment(Interval interval, String version, Map<String, Object> loadSpec)
|
||||||
|
{
|
||||||
|
return new DataSegment(
|
||||||
|
DATA_SOURCE,
|
||||||
|
interval,
|
||||||
|
version,
|
||||||
|
loadSpec,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
9,
|
||||||
|
10L
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue