diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 94c2fae60fd..611c19f8462 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTime; @@ -126,11 +127,11 @@ public interface SegmentsMetadataManager ); /** - * Returns an iterable to go over un-used segments for a given datasource over an optional interval. - * The order in which segments are iterated is from earliest start-time, with ties being broken with earliest end-time - * first. Note: the iteration may not be as trivially cheap as, - * for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it - * iterates the returned iterable only once rather than several times. + * Returns an iterable to go over un-used segments and their associated metadata for a given datasource over an + * optional interval. The order in which segments are iterated is from earliest start-time, with ties being broken + * with earliest end-time first. Note: the iteration may not be as trivially cheap as for example, iteration over an + * ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned iterable only + * once rather than several times. * * @param datasource the name of the datasource. * @param interval an optional interval to search over. If none is specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY} @@ -141,7 +142,7 @@ public interface SegmentsMetadataManager * @param sortOrder an optional order with which to return the matching segments by id, start time, end time. * If none is specified, the order of the results is not guarenteed. */ - Iterable iterateAllUnusedSegmentsForDatasource( + Iterable iterateAllUnusedSegmentsForDatasource( String datasource, @Nullable Interval interval, @Nullable Integer limit, diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 0b423006152..88dd26c03d0 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; @@ -957,8 +958,9 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager } /** - * Retrieves segments for a given datasource that are marked unused and that are *fully contained by* an optionally - * specified interval. If the interval specified is null, this method will retrieve all unused segments. + * Retrieves segments and their associated metadata for a given datasource that are marked unused and that are + * *fully contained by* an optionally specified interval. If the interval specified is null, this method will + * retrieve all unused segments. * * This call does not return any information about realtime segments. * @@ -976,7 +978,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager * Returns an iterable. */ @Override - public Iterable iterateAllUnusedSegmentsForDatasource( + public Iterable iterateAllUnusedSegmentsForDatasource( final String datasource, @Nullable final Interval interval, @Nullable final Integer limit, @@ -993,8 +995,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval); - try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(datasource, intervals, limit, lastSegmentId, sortOrder, null)) { + try (final CloseableIterator iterator = + queryTool.retrieveUnusedSegmentsPlus(datasource, intervals, limit, lastSegmentId, sortOrder, null)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 3bd7c48ad05..adbfb0fda23 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTime; @@ -173,6 +174,49 @@ public class SqlSegmentsMetadataQuery ); } + /** + * Similar to {@link #retrieveUnusedSegments}, but also retrieves associated metadata for the segments for a given + * datasource that are marked unused and that are *fully contained by* any interval in a particular collection of + * intervals. If the collection of intervals is empty, this method will retrieve all unused segments. + * + * This call does not return any information about realtime segments. + * + * @param dataSource The name of the datasource + * @param intervals The intervals to search over + * @param limit The limit of segments to return + * @param lastSegmentId the last segment id from which to search for results. All segments returned are > + * this segment lexigraphically if sortOrder is null or ASC, or < this segment + * lexigraphically if sortOrder is DESC. + * @param sortOrder Specifies the order with which to return the matching segments by start time, end time. + * A null value indicates that order does not matter. + * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code intervals} + * with {@code used_status_last_updated} no later than this time will be included in the + * iterator. Segments without {@code used_status_last_updated} time (due to an upgrade + * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored + + * Returns a closeable iterator. You should close it when you are done. + */ + public CloseableIterator retrieveUnusedSegmentsPlus( + final String dataSource, + final Collection intervals, + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder, + @Nullable final DateTime maxUsedStatusLastUpdatedTime + ) + { + return retrieveSegmentsPlus( + dataSource, + intervals, + IntervalMode.CONTAINS, + false, + limit, + lastSegmentId, + sortOrder, + maxUsedStatusLastUpdatedTime + ); + } + /** * Marks the provided segments as either used or unused. * @@ -445,6 +489,54 @@ public class SqlSegmentsMetadataQuery } } + private CloseableIterator retrieveSegmentsPlus( + final String dataSource, + final Collection intervals, + final IntervalMode matchMode, + final boolean used, + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder, + @Nullable final DateTime maxUsedStatusLastUpdatedTime + ) + { + if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { + return CloseableIterators.withEmptyBaggage( + retrieveSegmentsPlusInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) + ); + } else { + final List> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH); + final List> resultingIterators = new ArrayList<>(); + Integer limitPerBatch = limit; + + for (final List intervalList : intervalsLists) { + final UnmodifiableIterator iterator = retrieveSegmentsPlusInIntervalsBatch( + dataSource, + intervalList, + matchMode, + used, + limitPerBatch, + lastSegmentId, + sortOrder, + maxUsedStatusLastUpdatedTime + ); + if (limitPerBatch != null) { + // If limit is provided, we need to shrink the limit for subsequent batches or circuit break if + // we have reached what was requested for. + final List dataSegments = ImmutableList.copyOf(iterator); + resultingIterators.add(dataSegments.iterator()); + if (dataSegments.size() >= limitPerBatch) { + break; + } + limitPerBatch -= dataSegments.size(); + } else { + resultingIterators.add(iterator); + } + } + return CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator())); + } + } + private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( final String dataSource, final Collection intervals, @@ -455,12 +547,73 @@ public class SqlSegmentsMetadataQuery @Nullable final SortOrder sortOrder, @Nullable final DateTime maxUsedStatusLastUpdatedTime ) + { + final Query> sql = buildSegmentsTableQuery( + dataSource, + intervals, + matchMode, + used, + limit, + lastSegmentId, + sortOrder, + maxUsedStatusLastUpdatedTime, + false + ); + + final ResultIterator resultIterator = getDataSegmentResultIterator(sql); + + return filterDataSegmentIteratorByInterval(resultIterator, intervals, matchMode); + } + + private UnmodifiableIterator retrieveSegmentsPlusInIntervalsBatch( + final String dataSource, + final Collection intervals, + final IntervalMode matchMode, + final boolean used, + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder, + @Nullable final DateTime maxUsedStatusLastUpdatedTime + ) + { + + final Query> sql = buildSegmentsTableQuery( + dataSource, + intervals, + matchMode, + used, + limit, + lastSegmentId, + sortOrder, + maxUsedStatusLastUpdatedTime, + true + ); + + final ResultIterator resultIterator = getDataSegmentPlusResultIterator(sql); + + return filterDataSegmentPlusIteratorByInterval(resultIterator, intervals, matchMode); + } + + private Query> buildSegmentsTableQuery( + final String dataSource, + final Collection intervals, + final IntervalMode matchMode, + final boolean used, + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder, + @Nullable final DateTime maxUsedStatusLastUpdatedTime, + final boolean includeExtraInfo + ) { // Check if the intervals all support comparing as strings. If so, bake them into the SQL. final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings); - final StringBuilder sb = new StringBuilder(); - sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource"); + if (includeExtraInfo) { + sb.append("SELECT payload, created_date, used_status_last_updated FROM %s WHERE used = :used AND dataSource = :dataSource"); + } else { + sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource"); + } if (compareAsString) { appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector); @@ -513,10 +666,31 @@ public class SqlSegmentsMetadataQuery bindQueryIntervals(sql, intervals); } - final ResultIterator resultIterator = - sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class)) - .iterator(); + return sql; + } + private ResultIterator getDataSegmentResultIterator(Query> sql) + { + return sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class)) + .iterator(); + } + + private ResultIterator getDataSegmentPlusResultIterator(Query> sql) + { + return sql.map((index, r, ctx) -> new DataSegmentPlus( + JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class), + DateTimes.of(r.getString(2)), + DateTimes.of(r.getString(3)) + )) + .iterator(); + } + + private UnmodifiableIterator filterDataSegmentIteratorByInterval( + ResultIterator resultIterator, + final Collection intervals, + final IntervalMode matchMode + ) + { return Iterators.filter( resultIterator, dataSegment -> { @@ -538,6 +712,33 @@ public class SqlSegmentsMetadataQuery ); } + private UnmodifiableIterator filterDataSegmentPlusIteratorByInterval( + ResultIterator resultIterator, + final Collection intervals, + final IntervalMode matchMode + ) + { + return Iterators.filter( + resultIterator, + dataSegment -> { + if (intervals.isEmpty()) { + return true; + } else { + // Must re-check that the interval matches, even if comparing as string, because the *segment interval* + // might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a + // segment interval like "20010/20011".) + for (Interval interval : intervals) { + if (matchMode.apply(interval, dataSegment.getDataSegment().getInterval())) { + return true; + } + } + + return false; + } + } + ); + } + private static int computeNumChangedSegments(List segmentIds, int[] segmentChanges) { int numChangedSegments = 0; diff --git a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java new file mode 100644 index 00000000000..2a0d7fc05ea --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Encapsulates a {@link DataSegment} and additional metadata about it: + * {@link DataSegmentPlus#createdDate}: The time when the segment was created + * {@link DataSegmentPlus#usedStatusLastUpdatedDate}: The time when the segments used status was last updated + *

+ * The class closesly resembles the row structure of the {@link MetadataStorageTablesConfig#getSegmentsTable()} + *

+ */ +@UnstableApi +public class DataSegmentPlus +{ + private final DataSegment dataSegment; + private final DateTime createdDate; + @Nullable + private final DateTime usedStatusLastUpdatedDate; + + @JsonCreator + public DataSegmentPlus( + @JsonProperty("dataSegment") final DataSegment dataSegment, + @JsonProperty("createdDate") final DateTime createdDate, + @JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime usedStatusLastUpdatedDate + ) + { + this.dataSegment = dataSegment; + this.createdDate = createdDate; + this.usedStatusLastUpdatedDate = usedStatusLastUpdatedDate; + } + + @JsonProperty + public DateTime getCreatedDate() + { + return createdDate; + } + + @Nullable + @JsonProperty + public DateTime getUsedStatusLastUpdatedDate() + { + return usedStatusLastUpdatedDate; + } + + @JsonProperty + public DataSegment getDataSegment() + { + return dataSegment; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DataSegmentPlus that = (DataSegmentPlus) o; + return Objects.equals(dataSegment, that.getDataSegment()) + && Objects.equals(createdDate, that.getCreatedDate()) + && Objects.equals(usedStatusLastUpdatedDate, that.getUsedStatusLastUpdatedDate()); + } + + @Override + public int hashCode() + { + return Objects.hash( + dataSegment, + createdDate, + usedStatusLastUpdatedDate + ); + } + + @Override + public String toString() + { + return "DataSegmentPlus{" + + "createdDate=" + getCreatedDate() + + ", usedStatusLastUpdatedDate=" + getUsedStatusLastUpdatedDate() + + ", dataSegment=" + getDataSegment() + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index fb976a04bf3..c53a998e238 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -363,7 +363,7 @@ public class MetadataResource SortOrder theSortOrder = sortOrder == null ? null : SortOrder.fromValue(sortOrder); final Interval theInterval = interval != null ? Intervals.of(interval.replace('_', '/')) : null; - Iterable unusedSegments = segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource( + Iterable unusedSegments = segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource( dataSource, theInterval, limit, @@ -371,13 +371,13 @@ public class MetadataResource theSortOrder ); - final Function> raGenerator = segment -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + final Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources(req, unusedSegments, raGenerator, authorizerMapper); - final List retVal = new ArrayList<>(); + final List retVal = new ArrayList<>(); authorizedSegments.iterator().forEachRemaining(retVal::add); return Response.status(Response.Status.OK).entity(retVal).build(); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 0626792f1fd..da7bf422692 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -42,7 +42,9 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; @@ -84,6 +86,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; public class IndexerSQLMetadataStorageCoordinatorTest @@ -1240,7 +1243,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), @@ -1251,13 +1255,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(segments.containsAll(actualUnusedSegments)); + + final ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + null, + null, + null, + null + ); + Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size()); + verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); } @Test public void testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( ImmutableList.of(), @@ -1268,13 +1283,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(segments.containsAll(actualUnusedSegments)); + + final ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(), + null, + null, + null, + null + ); + Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size()); + verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); } @Test public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws IOException { final List segments = createAndGetUsedYearSegments(2033, 2133); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); String lastSegmentId = segments.get(9).getId().toString(); final List expectedSegmentsAscOrder = segments.stream() @@ -1290,6 +1316,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size()); Assert.assertTrue(expectedSegmentsAscOrder.containsAll(actualUnusedSegments)); + ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(), + null, + lastSegmentId, + null, + null + ); + Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegmentsPlus.size()); + verifyContainsAllSegmentsPlus(expectedSegmentsAscOrder, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); + actualUnusedSegments = retrieveUnusedSegments( ImmutableList.of(), null, @@ -1300,6 +1336,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size()); Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments); + actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(), + null, + lastSegmentId, + SortOrder.ASC, + null + ); + Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegmentsPlus.size()); + verifyEqualsAllSegmentsPlus(expectedSegmentsAscOrder, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); + final List expectedSegmentsDescOrder = segments.stream() .filter(s -> s.getId().toString().compareTo(lastSegmentId) < 0) .collect(Collectors.toList()); @@ -1314,13 +1360,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); Assert.assertEquals(expectedSegmentsDescOrder.size(), actualUnusedSegments.size()); Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments); + + actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(), + null, + lastSegmentId, + SortOrder.DESC, + null + ); + Assert.assertEquals(expectedSegmentsDescOrder.size(), actualUnusedSegmentsPlus.size()); + verifyEqualsAllSegmentsPlus(expectedSegmentsDescOrder, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); } @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), @@ -1331,13 +1388,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(segments.containsAll(actualUnusedSegments)); + + final ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(), + segments.size(), + null, + null, + null + ); + Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size()); + verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); } @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); final int requestedLimit = segments.size() - 1; final ImmutableList actualUnusedSegments = retrieveUnusedSegments( @@ -1347,18 +1415,34 @@ public class IndexerSQLMetadataStorageCoordinatorTest null, null ); + final List expectedSegments = segments.stream().limit(requestedLimit).collect(Collectors.toList()); Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); - Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList()))); + Assert.assertTrue(actualUnusedSegments.containsAll(expectedSegments)); + + final ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(), + requestedLimit, + null, + null, + null + ); + Assert.assertEquals(requestedLimit, actualUnusedSegmentsPlus.size()); + verifyContainsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); } @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndLastSegmentId() throws IOException { final List segments = createAndGetUsedYearSegments(2034, 2133); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); final int requestedLimit = segments.size(); final String lastSegmentId = segments.get(4).getId().toString(); + final List expectedSegments = segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) + .limit(requestedLimit) + .collect(Collectors.toList()); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), requestedLimit, @@ -1367,20 +1451,32 @@ public class IndexerSQLMetadataStorageCoordinatorTest null ); Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size()); - Assert.assertEquals(actualUnusedSegments, segments.stream() - .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) - .limit(requestedLimit) - .collect(Collectors.toList())); + Assert.assertEquals(actualUnusedSegments, expectedSegments); + + final ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(), + requestedLimit, + lastSegmentId, + null, + null + ); + Assert.assertEquals(segments.size() - 5, actualUnusedSegmentsPlus.size()); + verifyEqualsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); } @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndLastSegmentId() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); final int requestedLimit = segments.size() - 1; final String lastSegmentId = segments.get(4).getId().toString(); + final List expectedSegments = segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) + .limit(requestedLimit) + .collect(Collectors.toList()); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), requestedLimit, @@ -1389,17 +1485,25 @@ public class IndexerSQLMetadataStorageCoordinatorTest null ); Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size()); - Assert.assertEquals(actualUnusedSegments, segments.stream() - .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) - .limit(requestedLimit) - .collect(Collectors.toList())); + Assert.assertEquals(actualUnusedSegments, expectedSegments); + + final ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + requestedLimit, + lastSegmentId, + null, + null + ); + Assert.assertEquals(requestedLimit - 4, actualUnusedSegmentsPlus.size()); + verifyEqualsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); } @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervals() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), @@ -1410,6 +1514,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(actualUnusedSegments.containsAll(segments)); + + final ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + segments.size() + 1, + null, + null, + null + ); + Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size()); + verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus, usedStatusLastUpdatedTime); } @Test @@ -1430,13 +1544,23 @@ public class IndexerSQLMetadataStorageCoordinatorTest null ); Assert.assertEquals(0, actualUnusedSegments.size()); + + final ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(outOfRangeInterval), + null, + null, + null, + null + ); + Assert.assertEquals(0, actualUnusedSegmentsPlus.size()); } @Test public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime() throws IOException { final List segments = createAndGetUsedYearSegments(1905, 1910); - markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime); final Interval interval = Intervals.of("1905/1920"); @@ -1449,6 +1573,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); Assert.assertEquals(5, actualUnusedSegments1.size()); + ImmutableList actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(interval), + null, + null, + null, + DateTimes.nowUtc() + ); + Assert.assertEquals(5, actualUnusedSegmentsPlus.size()); + final ImmutableList actualUnusedSegments2 = retrieveUnusedSegments( ImmutableList.of(interval), null, @@ -1457,6 +1590,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest DateTimes.nowUtc().minusHours(1) ); Assert.assertEquals(0, actualUnusedSegments2.size()); + + actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus( + ImmutableList.of(interval), + null, + null, + null, + DateTimes.nowUtc().minusHours(1) + ); + Assert.assertEquals(0, actualUnusedSegmentsPlus.size()); } @Test @@ -1492,6 +1634,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); Assert.assertEquals(oddYearSegments.size(), actualUnusedSegments1.size()); + final ImmutableList actualUnusedSegmentsPlus1 = retrieveUnusedSegmentsPlus( + ImmutableList.of(interval), + null, + null, + null, + maxUsedStatusLastUpdatedTime1 + ); + Assert.assertEquals(oddYearSegments.size(), actualUnusedSegmentsPlus1.size()); + final ImmutableList actualUnusedSegments2 = retrieveUnusedSegments( ImmutableList.of(interval), null, @@ -1500,6 +1651,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest maxUsedStatusLastUpdatedTime2 ); Assert.assertEquals(segments.size(), actualUnusedSegments2.size()); + + final ImmutableList actualUnusedSegmentsPlus2 = retrieveUnusedSegmentsPlus( + ImmutableList.of(interval), + null, + null, + null, + maxUsedStatusLastUpdatedTime2 + ); + Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus2.size()); } @Test @@ -3343,6 +3503,64 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); } + private ImmutableList retrieveUnusedSegmentsPlus( + final List intervals, + final Integer limit, + final String lastSegmentId, + final SortOrder sortOrder, + final DateTime maxUsedStatusLastUpdatedTime + ) + { + return derbyConnector.inReadOnlyTransaction( + (handle, status) -> { + try (final CloseableIterator iterator = + SqlSegmentsMetadataQuery.forHandle( + handle, + derbyConnector, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + mapper + ) + .retrieveUnusedSegmentsPlus(DS.WIKI, intervals, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) { + return ImmutableList.copyOf(iterator); + } + } + ); + } + + private void verifyContainsAllSegmentsPlus( + List expectedSegments, + List actualUnusedSegmentsPlus, + DateTime usedStatusLastUpdatedTime) + { + Map expectedIdToSegment = expectedSegments.stream().collect(Collectors.toMap(DataSegment::getId, Function.identity())); + Map actualIdToSegmentPlus = actualUnusedSegmentsPlus.stream() + .collect(Collectors.toMap(d -> d.getDataSegment().getId(), Function.identity())); + Assert.assertTrue(expectedIdToSegment.entrySet().stream().allMatch(e -> { + DataSegmentPlus segmentPlus = actualIdToSegmentPlus.get(e.getKey()); + return segmentPlus != null + && !segmentPlus.getCreatedDate().isAfter(usedStatusLastUpdatedTime) + && segmentPlus.getUsedStatusLastUpdatedDate() != null + && segmentPlus.getUsedStatusLastUpdatedDate().equals(usedStatusLastUpdatedTime); + })); + } + + private void verifyEqualsAllSegmentsPlus( + List expectedSegments, + List actualUnusedSegmentsPlus, + DateTime usedStatusLastUpdatedTime + ) + { + Assert.assertEquals(expectedSegments.size(), actualUnusedSegmentsPlus.size()); + for (int i = 0; i < expectedSegments.size(); i++) { + DataSegment expectedSegment = expectedSegments.get(i); + DataSegmentPlus actualSegmentPlus = actualUnusedSegmentsPlus.get(i); + Assert.assertEquals(expectedSegment.getId(), actualSegmentPlus.getDataSegment().getId()); + Assert.assertTrue(!actualSegmentPlus.getCreatedDate().isAfter(usedStatusLastUpdatedTime) + && actualSegmentPlus.getUsedStatusLastUpdatedDate() != null + && actualSegmentPlus.getUsedStatusLastUpdatedDate().equals(usedStatusLastUpdatedTime)); + } + } + /** * This test-only shard type is to test the behavior of "old generation" tombstones with 1 core partition. */ diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index 1e2f2d462c4..92a40b9a443 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -25,6 +25,7 @@ import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.metadata.SortOrder; +import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; @@ -194,7 +195,7 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager } @Override - public Iterable iterateAllUnusedSegmentsForDatasource( + public Iterable iterateAllUnusedSegmentsForDatasource( String datasource, @Nullable Interval interval, @Nullable Integer limit, diff --git a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java new file mode 100644 index 00000000000..f2c9a68a8d5 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.timeline.CompactionState; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Map; + +public class DataSegmentPlusTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + private static final int TEST_VERSION = 0x9; + + @Before + public void setUp() + { + InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT); + MAPPER.setInjectableValues(injectableValues); + } + @Test + public void testEquals() + { + EqualsVerifier.forClass(DataSegmentPlus.class) + .withNonnullFields("dataSegment", "createdDate") + .usingGetClass() + .verify(); + } + + @Test + public void testSerde() throws JsonProcessingException + { + final Interval interval = Intervals.of("2011-10-01/2011-10-02"); + final ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); + + String createdDateStr = "2024-01-20T00:00:00.701Z"; + String usedStatusLastUpdatedDateStr = "2024-01-20T01:00:00.701Z"; + DateTime createdDate = DateTimes.of(createdDateStr); + DateTime usedStatusLastUpdatedDate = DateTimes.of(usedStatusLastUpdatedDateStr); + DataSegmentPlus segmentPlus = new DataSegmentPlus( + new DataSegment( + "something", + interval, + "1", + loadSpec, + Arrays.asList("dim1", "dim2"), + Arrays.asList("met1", "met2"), + new NumberedShardSpec(3, 0), + new CompactionState( + new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "bar", "foo")) + ), + ImmutableList.of(ImmutableMap.of("type", "count", "name", "count")), + ImmutableMap.of("filter", ImmutableMap.of("type", "selector", "dimension", "dim1", "value", "foo")), + ImmutableMap.of(), + ImmutableMap.of() + ), + TEST_VERSION, + 1 + ), + createdDate, + usedStatusLastUpdatedDate + ); + + final Map objectMap = MAPPER.readValue( + MAPPER.writeValueAsString(segmentPlus), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + Assert.assertEquals(3, objectMap.size()); + final Map segmentObjectMap = MAPPER.readValue( + MAPPER.writeValueAsString(segmentPlus.getDataSegment()), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + // verify dataSegment + Assert.assertEquals(11, segmentObjectMap.size()); + Assert.assertEquals("something", segmentObjectMap.get("dataSource")); + Assert.assertEquals(interval.toString(), segmentObjectMap.get("interval")); + Assert.assertEquals("1", segmentObjectMap.get("version")); + Assert.assertEquals(loadSpec, segmentObjectMap.get("loadSpec")); + Assert.assertEquals("dim1,dim2", segmentObjectMap.get("dimensions")); + Assert.assertEquals("met1,met2", segmentObjectMap.get("metrics")); + Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), segmentObjectMap.get("shardSpec")); + Assert.assertEquals(TEST_VERSION, segmentObjectMap.get("binaryVersion")); + Assert.assertEquals(1, segmentObjectMap.get("size")); + Assert.assertEquals(6, ((Map) segmentObjectMap.get("lastCompactionState")).size()); + + // verify extra metadata + Assert.assertEquals(createdDateStr, objectMap.get("createdDate")); + Assert.assertEquals(usedStatusLastUpdatedDateStr, objectMap.get("usedStatusLastUpdatedDate")); + + DataSegmentPlus deserializedSegmentPlus = MAPPER.readValue(MAPPER.writeValueAsString(segmentPlus), DataSegmentPlus.class); + + // verify dataSegment + Assert.assertEquals(segmentPlus.getDataSegment().getDataSource(), deserializedSegmentPlus.getDataSegment().getDataSource()); + Assert.assertEquals(segmentPlus.getDataSegment().getInterval(), deserializedSegmentPlus.getDataSegment().getInterval()); + Assert.assertEquals(segmentPlus.getDataSegment().getVersion(), deserializedSegmentPlus.getDataSegment().getVersion()); + Assert.assertEquals(segmentPlus.getDataSegment().getLoadSpec(), deserializedSegmentPlus.getDataSegment().getLoadSpec()); + Assert.assertEquals(segmentPlus.getDataSegment().getDimensions(), deserializedSegmentPlus.getDataSegment().getDimensions()); + Assert.assertEquals(segmentPlus.getDataSegment().getMetrics(), deserializedSegmentPlus.getDataSegment().getMetrics()); + Assert.assertEquals(segmentPlus.getDataSegment().getShardSpec(), deserializedSegmentPlus.getDataSegment().getShardSpec()); + Assert.assertEquals(segmentPlus.getDataSegment().getSize(), deserializedSegmentPlus.getDataSegment().getSize()); + Assert.assertEquals(segmentPlus.getDataSegment().getId(), deserializedSegmentPlus.getDataSegment().getId()); + Assert.assertEquals(segmentPlus.getDataSegment().getLastCompactionState(), deserializedSegmentPlus.getDataSegment().getLastCompactionState()); + + // verify extra metadata + Assert.assertEquals(segmentPlus.getCreatedDate(), deserializedSegmentPlus.getCreatedDate()); + Assert.assertEquals(segmentPlus.getUsedStatusLastUpdatedDate(), deserializedSegmentPlus.getUsedStatusLastUpdatedDate()); + } +} diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index 8c818c918f7..fa99a411d36 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -27,6 +27,7 @@ import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Comparators; @@ -74,6 +75,10 @@ public class MetadataResourceTest .withNumPartitions(NUM_PARTITIONS) .eachOfSizeInMb(500) .toArray(new DataSegment[0]); + + private final List segmentsPlus = Arrays.stream(segments) + .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), DateTimes.nowUtc())) + .collect(Collectors.toList()); private HttpServletRequest request; private SegmentsMetadataManager segmentsMetadataManager; private IndexerMetadataStorageCoordinator storageCoordinator; @@ -281,7 +286,7 @@ public class MetadataResourceTest null, null ); - List resultList = extractResponseList(response); + List resultList = extractResponseList(response); Assert.assertTrue(resultList.isEmpty()); // test valid datasource with bad limit - fails with expected invalid limit message @@ -302,7 +307,7 @@ public class MetadataResourceTest response = metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, null, null, null, null); resultList = extractResponseList(response); - Assert.assertEquals(Arrays.asList(segments), resultList); + Assert.assertEquals(segmentsPlus, resultList); // test valid datasource with interval filter - returns all unused segments for that datasource within interval int numDays = 2; @@ -311,7 +316,10 @@ public class MetadataResourceTest resultList = extractResponseList(response); Assert.assertEquals(NUM_PARTITIONS * numDays, resultList.size()); - Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2], segments[3]), resultList); + Assert.assertEquals( + Arrays.asList(segmentsPlus.get(0), segmentsPlus.get(1), segmentsPlus.get(2), segmentsPlus.get(3)), + resultList + ); // test valid datasource with interval filter limit and last segment id - returns unused segments for that // datasource within interval upto limit starting at last segment id @@ -320,7 +328,7 @@ public class MetadataResourceTest resultList = extractResponseList(response); Assert.assertEquals(limit, resultList.size()); - Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2]), resultList); + Assert.assertEquals(Arrays.asList(segmentsPlus.get(0), segmentsPlus.get(1), segmentsPlus.get(2)), resultList); // test valid datasource with interval filter limit and offset - returns unused segments for that datasource within // interval upto limit starting at offset @@ -334,10 +342,10 @@ public class MetadataResourceTest ); resultList = extractResponseList(response); - Assert.assertEquals(Collections.singletonList(segments[3]), resultList); + Assert.assertEquals(Collections.singletonList(segmentsPlus.get(3)), resultList); } - Answer> mockIterateAllUnusedSegmentsForDatasource() + Answer> mockIterateAllUnusedSegmentsForDatasource() { return invocationOnMock -> { String dataSourceName = invocationOnMock.getArgument(0); @@ -349,16 +357,17 @@ public class MetadataResourceTest return ImmutableList.of(); } - return Arrays.stream(segments) - .filter(d -> d.getDataSource().equals(dataSourceName) + return segmentsPlus.stream() + .filter(d -> d.getDataSegment().getDataSource().equals(dataSourceName) && (interval == null - || (d.getInterval().getStartMillis() >= interval.getStartMillis() - && d.getInterval().getEndMillis() <= interval.getEndMillis())) + || (d.getDataSegment().getInterval().getStartMillis() >= interval.getStartMillis() + && d.getDataSegment().getInterval().getEndMillis() <= interval.getEndMillis())) && (lastSegmentId == null - || (sortOrder == null && d.getId().toString().compareTo(lastSegmentId) > 0) - || (sortOrder == SortOrder.ASC && d.getId().toString().compareTo(lastSegmentId) > 0) - || (sortOrder == SortOrder.DESC && d.getId().toString().compareTo(lastSegmentId) < 0))) - .sorted((o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o1.getInterval(), o2.getInterval())) + || (sortOrder == null && d.getDataSegment().getId().toString().compareTo(lastSegmentId) > 0) + || (sortOrder == SortOrder.ASC && d.getDataSegment().getId().toString().compareTo(lastSegmentId) > 0) + || (sortOrder == SortOrder.DESC && d.getDataSegment().getId().toString().compareTo(lastSegmentId) < 0))) + .sorted((o1, o2) -> Comparators.intervalsByStartThenEnd() + .compare(o1.getDataSegment().getInterval(), o2.getDataSegment().getInterval())) .limit(limit != null ? limit : segments.length)