Fix handling an empty list of versions (#16198)

* Differentiate null and empty lists of segment IDs and versions.

Treat them differently so the. Segment IDs and versions can be An empty list,
in which case, the queries should just not return anything. Versions are optional, so
they can be null, which just indicates nothing, so the queries should return segments with
all possible versions. Segment IDs cannot be null as indicated by the absence of @Nullable
annotation.

* Update javadocs and add empty versions test to kill task.

* Add test for RetrieveSegmentsActions as well.
This commit is contained in:
Abhishek Radhakrishnan 2024-03-25 17:51:24 -07:00 committed by GitHub
parent e7dc00b86d
commit 95595ba4f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 284 additions and 20 deletions

View File

@ -120,6 +120,20 @@ public class RetrieveSegmentsActionsTest
Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments); Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments);
} }
@Test
public void testRetrieveUnusedSegmentsActionWithEmptyVersions()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(
task.getDataSource(),
INTERVAL,
ImmutableList.of(),
null,
null
);
final Set<DataSegment> observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(ImmutableSet.of(), observedUnusedSegments);
}
@Test @Test
public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime() public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime()
{ {

View File

@ -214,6 +214,54 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
Assert.assertEquals(ImmutableSet.of(segment5V3), new HashSet<>(observedUnusedSegments)); Assert.assertEquals(ImmutableSet.of(segment5V3), new HashSet<>(observedUnusedSegments));
} }
@Test
public void testKillSegmentsWithEmptyVersions() throws Exception
{
final DateTime now = DateTimes.nowUtc();
final String v1 = now.toString();
final String v2 = now.minusHours(2).toString();
final String v3 = now.minusHours(3).toString();
final DataSegment segment1V1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1);
final DataSegment segment2V1 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1);
final DataSegment segment3V1 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1);
final DataSegment segment4V2 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2);
final DataSegment segment5V3 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3);
final Set<DataSegment> segments = ImmutableSet.of(segment1V1, segment2V1, segment3V1, segment4V2, segment5V3);
Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments));
Assert.assertEquals(
segments.size(),
getSegmentsMetadataManager().markSegmentsAsUnused(
segments.stream().map(DataSegment::getId).collect(Collectors.toSet())
)
);
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2018/2020"))
.versions(ImmutableList.of())
.batchSize(3)
.build();
Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Assert.assertEquals(
new KillTaskReport.Stats(0, 1, 0),
getReportedStats()
);
final List<DataSegment> observedUnusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
DATA_SOURCE,
Intervals.of("2018/2020"),
null,
null
);
Assert.assertEquals(segments, new HashSet<>(observedUnusedSegments));
}
@Test @Test
public void testKillSegmentsWithVersionsAndLimit() throws Exception public void testKillSegmentsWithVersionsAndLimit() throws Exception
{ {

View File

@ -159,7 +159,8 @@ public interface IndexerMetadataStorageCoordinator
* @param dataSource The data source the segments belong to * @param dataSource The data source the segments belong to
* @param interval Filter the data segments to ones that include data in this interval exclusively. * @param interval Filter the data segments to ones that include data in this interval exclusively.
* @param versions An optional list of segment versions to retrieve in the given {@code interval}. If unspecified, all * @param versions An optional list of segment versions to retrieve in the given {@code interval}. If unspecified, all
* versions of unused segments in the {@code interval} must be retrieved. * versions of unused segments in the {@code interval} must be retrieved. If an empty list is passed,
* no segments are retrieved.
* @param limit The maximum number of unused segments to retreive. If null, no limit is applied. * @param limit The maximum number of unused segments to retreive. If null, no limit is applied.
* @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval} * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval}
* with {@code used_status_last_updated} no later than this time will be included in the * with {@code used_status_last_updated} no later than this time will be included in the

View File

@ -56,7 +56,7 @@ public interface SegmentsMetadataManager
/** /**
* Marks non-overshadowed unused segments for the given interval and optional list of versions * Marks non-overshadowed unused segments for the given interval and optional list of versions
* as used. If versions are not specified, all versions of non-overshadowed unused segments in the interval * as used. If versions are not specified, all versions of non-overshadowed unused segments in the interval
* will be marked as used. * will be marked as used. If an empty list of versions is passed, no segments are marked as used.
* @return Number of segments updated * @return Number of segments updated
*/ */
int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions); int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions);
@ -89,7 +89,8 @@ public interface SegmentsMetadataManager
/** /**
* Marks segments as unused that are <b>fully contained</b> in the given interval for an optional list of versions. * Marks segments as unused that are <b>fully contained</b> in the given interval for an optional list of versions.
* If versions are not specified, all versions of segments in the interval will be marked as unused. * If versions are not specified, all versions of segments in the interval will be marked as unused. If an empty list
* of versions is passed, no segments are marked as unused.
* Segments that are already marked as unused are not updated. * Segments that are already marked as unused are not updated.
* @return The number of segments updated * @return The number of segments updated
*/ */

View File

@ -35,7 +35,6 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
@ -159,7 +158,8 @@ public class SqlSegmentsMetadataQuery
* @param dataSource The name of the datasource * @param dataSource The name of the datasource
* @param intervals The intervals to search over * @param intervals The intervals to search over
* @param versions An optional list of unused segment versions to retrieve in the given {@code intervals}. * @param versions An optional list of unused segment versions to retrieve in the given {@code intervals}.
* If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. * If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. If an
* empty list is passed, no segments are retrieved.
* @param limit The limit of segments to return * @param limit The limit of segments to return
* @param lastSegmentId the last segment id from which to search for results. All segments returned are > * @param lastSegmentId the last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or ASC, or < this segment * this segment lexigraphically if sortOrder is null or ASC, or < this segment
@ -256,7 +256,7 @@ public class SqlSegmentsMetadataQuery
private List<DataSegmentPlus> retrieveSegmentBatchById(String datasource, List<String> segmentIds) private List<DataSegmentPlus> retrieveSegmentBatchById(String datasource, List<String> segmentIds)
{ {
if (CollectionUtils.isNullOrEmpty(segmentIds)) { if (segmentIds.isEmpty()) {
return Collections.emptyList(); return Collections.emptyList();
} }
@ -344,6 +344,10 @@ public class SqlSegmentsMetadataQuery
*/ */
public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List<String> versions) public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List<String> versions)
{ {
if (versions != null && versions.isEmpty()) {
return 0;
}
if (Intervals.isEternity(interval)) { if (Intervals.isEternity(interval)) {
final StringBuilder sb = new StringBuilder(); final StringBuilder sb = new StringBuilder();
sb.append( sb.append(
@ -354,9 +358,7 @@ public class SqlSegmentsMetadataQuery
) )
); );
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); if (versions != null) {
if (hasVersions) {
sb.append(getParameterizedInConditionForColumn("version", versions)); sb.append(getParameterizedInConditionForColumn("version", versions));
} }
@ -366,7 +368,7 @@ public class SqlSegmentsMetadataQuery
.bind("used", false) .bind("used", false)
.bind("used_status_last_updated", DateTimes.nowUtc().toString()); .bind("used_status_last_updated", DateTimes.nowUtc().toString());
if (hasVersions) { if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, stmt); bindColumnValuesToQueryWithInCondition("version", versions, stmt);
} }
@ -386,9 +388,7 @@ public class SqlSegmentsMetadataQuery
) )
); );
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); if (versions != null) {
if (hasVersions) {
sb.append(getParameterizedInConditionForColumn("version", versions)); sb.append(getParameterizedInConditionForColumn("version", versions));
} }
@ -400,7 +400,7 @@ public class SqlSegmentsMetadataQuery
.bind("end", interval.getEnd().toString()) .bind("end", interval.getEnd().toString())
.bind("used_status_last_updated", DateTimes.nowUtc().toString()); .bind("used_status_last_updated", DateTimes.nowUtc().toString());
if (hasVersions) { if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, stmt); bindColumnValuesToQueryWithInCondition("version", versions, stmt);
} }
return stmt.execute(); return stmt.execute();
@ -558,6 +558,10 @@ public class SqlSegmentsMetadataQuery
@Nullable final DateTime maxUsedStatusLastUpdatedTime @Nullable final DateTime maxUsedStatusLastUpdatedTime
) )
{ {
if (versions != null && versions.isEmpty()) {
return CloseableIterators.withEmptyBaggage(Collections.emptyIterator());
}
if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage( return CloseableIterators.withEmptyBaggage(
retrieveSegmentsInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) retrieveSegmentsInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
@ -733,9 +737,7 @@ public class SqlSegmentsMetadataQuery
sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode, connector.getQuoteString())); sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode, connector.getQuoteString()));
} }
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); if (versions != null) {
if (hasVersions) {
sb.append(getParameterizedInConditionForColumn("version", versions)); sb.append(getParameterizedInConditionForColumn("version", versions));
} }
@ -786,7 +788,7 @@ public class SqlSegmentsMetadataQuery
bindIntervalsToQuery(sql, intervals); bindIntervalsToQuery(sql, intervals);
} }
if (hasVersions) { if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, sql); bindColumnValuesToQueryWithInCondition("version", versions, sql);
} }
@ -898,7 +900,7 @@ public class SqlSegmentsMetadataQuery
*/ */
private static String getParameterizedInConditionForColumn(final String columnName, final List<String> values) private static String getParameterizedInConditionForColumn(final String columnName, final List<String> values)
{ {
if (CollectionUtils.isNullOrEmpty(values)) { if (values == null) {
return ""; return "";
} }
@ -927,7 +929,7 @@ public class SqlSegmentsMetadataQuery
final SQLStatement<?> query final SQLStatement<?> query
) )
{ {
if (CollectionUtils.isNullOrEmpty(values)) { if (values == null) {
return; return;
} }

View File

@ -640,6 +640,106 @@ public class SqlSegmentsMetadataManagerTest
); );
} }
@Test
public void testMarkAsUsedNonOvershadowedSegmentsInIntervalWithEmptyVersions() throws Exception
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-16T20:19:12.565Z"
);
// Overshadowed by koalaSegment2
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
Assert.assertEquals(
0,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
DS.KOALA,
Intervals.of("2017/2018"),
ImmutableList.of()
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithEmptyVersions() throws Exception
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-16T20:19:12.565Z"
);
// Overshadowed by koalaSegment2
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
Assert.assertEquals(
0,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
DS.KOALA,
Intervals.ETERNITY,
ImmutableList.of()
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test @Test
public void testMarkAsUsedNonOvershadowedSegmentsInFiniteIntervalWithVersions() throws Exception public void testMarkAsUsedNonOvershadowedSegmentsInFiniteIntervalWithVersions() throws Exception
{ {
@ -1045,6 +1145,104 @@ public class SqlSegmentsMetadataManagerTest
); );
} }
@Test
public void testMarkAsUnusedSegmentsInIntervalWithEmptyVersions() throws IOException
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DateTime now = DateTimes.nowUtc();
final String v1 = now.toString();
final String v2 = now.plus(Duration.standardDays(1)).toString();
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
v1
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
v2
);
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
v2
);
publisher.publishSegment(koalaSegment1);
publisher.publishSegment(koalaSegment2);
publisher.publishSegment(koalaSegment3);
final Interval theInterval = Intervals.of("2017-10-15/2017-10-18");
Assert.assertEquals(
0,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(
DS.KOALA,
theInterval,
ImmutableList.of()
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test
public void testMarkAsUnusedSegmentsInEternityIntervalWithEmptyVersions() throws IOException
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DateTime now = DateTimes.nowUtc();
final String v1 = now.toString();
final String v2 = now.plus(Duration.standardDays(1)).toString();
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
v1
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
v2
);
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
v2
);
publisher.publishSegment(koalaSegment1);
publisher.publishSegment(koalaSegment2);
publisher.publishSegment(koalaSegment3);
final Interval theInterval = Intervals.of("2017-10-15/2017-10-18");
Assert.assertEquals(
0,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(
DS.KOALA,
theInterval,
ImmutableList.of()
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test @Test
public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws IOException public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws IOException
{ {