From fa8e51149216a367e9b303961c784a99e69de138 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Tue, 19 Mar 2024 09:22:25 -0700 Subject: [PATCH] Add versions to `markUsed` and `markUnused` APIs (#16141) * Mark used and unused APIs by versions. * remove the conditional invocations. * isValid() and test updates. * isValid() and tests. * Remove warning logs for invalid user requests. Also, downgrade visibility. * Update resp message, etc. * tests and some cleanup. * Docs draft * Clarify docs * Update server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java Co-authored-by: Kashif Faraz * Review comments * Remove default interface methods only used in tests and update docs. * Clarify javadocs and @Nullable. * Add more tests. * Parameterized versions. --------- Co-authored-by: Kashif Faraz --- docs/api-reference/data-management-api.md | 28 +- .../actions/MarkSegmentsAsUnusedAction.java | 5 +- .../task/KillUnusedSegmentsTaskTest.java | 21 +- .../metadata/SegmentsMetadataManager.java | 16 +- .../metadata/SqlSegmentsMetadataManager.java | 35 ++- .../metadata/SqlSegmentsMetadataQuery.java | 146 +++++++-- .../server/http/DataSourcesResource.java | 60 ++-- .../SqlSegmentsMetadataManagerTest.java | 257 +++++++++++++++- .../druid/metadata/TestDerbyConnector.java | 2 +- .../simulate/TestSegmentsMetadataManager.java | 4 +- .../server/http/DataSourcesResourceTest.java | 286 ++++++++++++++---- 11 files changed, 708 insertions(+), 152 deletions(-) diff --git a/docs/api-reference/data-management-api.md b/docs/api-reference/data-management-api.md index 4adeaa8b208..754bf62f725 100644 --- a/docs/api-reference/data-management-api.md +++ b/docs/api-reference/data-management-api.md @@ -206,7 +206,8 @@ Marks the state of a group of segments as unused, using an array of segment IDs Pass the array of segment IDs or interval as a JSON object in the request body. For the interval, specify the start and end times as ISO 8601 strings to identify segments inclusive of the start time and exclusive of the end time. -Druid only updates the segments completely contained within the specified interval; partially overlapping segments are not affected. +Optionally, specify an array of segment versions with interval. Druid updates only the segments completely contained +within the specified interval that match the optional list of versions; partially overlapping segments are not affected. #### URL @@ -214,12 +215,13 @@ Druid only updates the segments completely contained within the specified interv #### Request body -The group of segments is sent as a JSON request payload that accepts one of the following properties: +The group of segments is sent as a JSON request payload that accepts the following properties: -|Property|Description|Example| -|----------|-------------|---------| -|`interval`|ISO 8601 segments interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| -|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`| +|Property|Description|Required|Example| +|----------|-------------|---------|---------| +|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| +|`segmentIds`|List of segment IDs.|Yes, if `interval` is not specified.|`["segmentId1", "segmentId2"]`| +|`versions`|List of segment versions. Must be provided with `interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`| #### Responses @@ -306,7 +308,8 @@ Marks the state of a group of segments as used, using an array of segment IDs or Pass the array of segment IDs or interval as a JSON object in the request body. For the interval, specify the start and end times as ISO 8601 strings to identify segments inclusive of the start time and exclusive of the end time. -Druid only updates the segments completely contained within the specified interval; partially overlapping segments are not affected. +Optionally, specify an array of segment versions with interval. Druid updates only the segments completely contained +within the specified interval that match the optional list of versions; partially overlapping segments are not affected. #### URL @@ -314,12 +317,13 @@ Druid only updates the segments completely contained within the specified interv #### Request body -The group of segments is sent as a JSON request payload that accepts one of the following properties: +The group of segments is sent as a JSON request payload that accepts the following properties: -|Property|Description|Example| -|----------|-------------|---------| -|`interval`| ISO 8601 segments interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| -|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`| +|Property|Description|Required|Example| +|----------|-------------|---------|---------| +|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| +|`segmentIds`|List of segment IDs.|Yes, if `interval` is not specified.|`["segmentId1", "segmentId2"]`| +|`versions`|List of segment versions. Must be provided with `interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`| #### Responses diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java index ddf57afbc18..93cb75280fa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java @@ -63,9 +63,8 @@ public class MarkSegmentsAsUnusedAction implements TaskAction @Override public Integer perform(Task task, TaskActionToolbox toolbox) { - int numMarked = toolbox.getIndexerMetadataStorageCoordinator() - .markSegmentsAsUnusedWithinInterval(dataSource, interval); - return numMarked; + return toolbox.getIndexerMetadataStorageCoordinator() + .markSegmentsAsUnusedWithinInterval(dataSource, interval); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 1e36cc825a0..382673bed4b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -387,7 +387,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase segments.size(), getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01") + Intervals.of("2018-01-01/2020-01-01"), + null ) ); @@ -434,7 +435,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment1.getInterval() + segment1.getInterval(), + null ) ); @@ -442,7 +444,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment4.getInterval() + segment4.getInterval(), + null ) ); @@ -450,7 +453,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment3.getInterval() + segment3.getInterval(), + null ) ); @@ -508,7 +512,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment1.getInterval() + segment1.getInterval(), + null ) ); @@ -516,7 +521,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment4.getInterval() + segment4.getInterval(), + null ) ); @@ -529,7 +535,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment3.getInterval() + segment3.getInterval(), + null ) ); diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 540eba990f2..f0b9f06425d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -53,7 +53,13 @@ public interface SegmentsMetadataManager */ int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource); - int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval); + /** + * Marks non-overshadowed unused segments for the given interval and optional list of versions + * as used. If versions are not specified, all versions of non-overshadowed unused segments in the interval + * will be marked as used. + * @return Number of segments updated + */ + int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions); /** * Marks the given segment IDs as "used" only if there are not already overshadowed @@ -81,7 +87,13 @@ public interface SegmentsMetadataManager */ int markAsUnusedAllSegmentsInDataSource(String dataSource); - int markAsUnusedSegmentsInInterval(String dataSource, Interval interval); + /** + * Marks segments as unused that are fully contained in the given interval for an optional list of versions. + * If versions are not specified, all versions of segments in the interval will be marked as unused. + * Segments that are already marked as unused are not updated. + * @return The number of segments updated + */ + int markAsUnusedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions); int markSegmentsAsUnused(Set segmentIds); diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 4a268a2257d..66a60e072c0 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -654,21 +654,25 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager @Override public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String dataSource) { - return doMarkAsUsedNonOvershadowedSegments(dataSource, null); + return doMarkAsUsedNonOvershadowedSegments(dataSource, null, null); } @Override - public int markAsUsedNonOvershadowedSegmentsInInterval(final String dataSource, final Interval interval) + public int markAsUsedNonOvershadowedSegmentsInInterval( + final String dataSource, + final Interval interval, + @Nullable final List versions + ) { Preconditions.checkNotNull(interval); - return doMarkAsUsedNonOvershadowedSegments(dataSource, interval); + return doMarkAsUsedNonOvershadowedSegments(dataSource, interval, versions); } - /** - * Implementation for both {@link #markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is null) - * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}. - */ - private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval) + private int doMarkAsUsedNonOvershadowedSegments( + final String dataSourceName, + final @Nullable Interval interval, + final @Nullable List versions + ) { final List unusedSegments = new ArrayList<>(); final SegmentTimeline timeline = new SegmentTimeline(); @@ -682,12 +686,12 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval); try (final CloseableIterator iterator = - queryTool.retrieveUsedSegments(dataSourceName, intervals)) { + queryTool.retrieveUsedSegments(dataSourceName, intervals, versions)) { timeline.addSegments(iterator); } try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null, null, null)) { + queryTool.retrieveUnusedSegments(dataSourceName, intervals, versions, null, null, null, null)) { while (iterator.hasNext()) { final DataSegment dataSegment = iterator.next(); timeline.addSegments(Iterators.singletonIterator(dataSegment)); @@ -796,7 +800,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager private int markSegmentsAsUsed(final List segmentIds) { if (segmentIds.isEmpty()) { - log.info("No segments found to update!"); + log.info("No segments found to mark as used."); return 0; } @@ -856,13 +860,18 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager } @Override - public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval interval) + public int markAsUnusedSegmentsInInterval( + final String dataSource, + final Interval interval, + @Nullable final List versions + ) { + Preconditions.checkNotNull(interval); try { return connector.getDBI().withHandle( handle -> SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper) - .markSegmentsUnused(dataSourceName, interval) + .markSegmentsUnused(dataSource, interval, versions) ); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 29465cd665b..5308f9dd7fe 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -42,6 +42,8 @@ import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.SQLStatement; +import org.skife.jdbi.v2.Update; import javax.annotation.Nullable; import java.util.ArrayList; @@ -119,11 +121,24 @@ public class SqlSegmentsMetadataQuery final String dataSource, final Collection intervals ) + { + return retrieveUsedSegments(dataSource, intervals, null); + } + + /** + * Similar to {@link #retrieveUsedSegments}, but with an additional {@code versions} argument. When {@code versions} + * is specified, all used segments in the specified {@code intervals} and {@code versions} are retrieved. + */ + public CloseableIterator retrieveUsedSegments( + final String dataSource, + final Collection intervals, + final List versions + ) { return retrieveSegments( dataSource, intervals, - null, + versions, IntervalMode.OVERLAPS, true, null, @@ -134,7 +149,7 @@ public class SqlSegmentsMetadataQuery } /** - * Retrieves segments for a given datasource that are marked unused and that are *fully contained by* any interval + * Retrieves segments for a given datasource that are marked unused and that are fully contained by any interval * in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all * unused segments. *

@@ -184,7 +199,7 @@ public class SqlSegmentsMetadataQuery /** * Similar to {@link #retrieveUnusedSegments}, but also retrieves associated metadata for the segments for a given - * datasource that are marked unused and that are *fully contained by* any interval in a particular collection of + * datasource that are marked unused and that are fully contained by any interval in a particular collection of * intervals. If the collection of intervals is empty, this method will retrieve all unused segments. * * This call does not return any information about realtime segments. @@ -312,45 +327,83 @@ public class SqlSegmentsMetadataQuery } /** - * Marks all used segments that are *fully contained by* a particular interval as unused. + * Marks all used segments that are fully contained by a particular interval as unused. * - * @return the number of segments actually modified. + * @return Number of segments updated. */ public int markSegmentsUnused(final String dataSource, final Interval interval) + { + return markSegmentsUnused(dataSource, interval, null); + } + + /** + * Marks all used segments that are fully contained by a particular interval filtered by an optional list of versions + * as unused. + * + * @return Number of segments updated. + */ + public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List versions) { if (Intervals.isEternity(interval)) { - return handle - .createStatement( - StringUtils.format( - "UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated " - + "WHERE dataSource = :dataSource AND used = true", - dbTables.getSegmentsTable() - ) + final StringBuilder sb = new StringBuilder(); + sb.append( + StringUtils.format( + "UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated " + + "WHERE dataSource = :dataSource AND used = true", + dbTables.getSegmentsTable() ) + ); + + final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); + + if (hasVersions) { + sb.append(getConditionForVersions(versions)); + } + + final Update stmt = handle + .createStatement(sb.toString()) .bind("dataSource", dataSource) .bind("used", false) - .bind("used_status_last_updated", DateTimes.nowUtc().toString()) - .execute(); + .bind("used_status_last_updated", DateTimes.nowUtc().toString()); + + if (hasVersions) { + bindVersionsToQuery(stmt, versions); + } + + return stmt.execute(); } else if (Intervals.canCompareEndpointsAsStrings(interval) && interval.getStart().getYear() == interval.getEnd().getYear()) { // Safe to write a WHERE clause with this interval. Note that it is unsafe if the years are different, because // that means extra characters can sneak in. (Consider a query interval like "2000-01-01/2001-01-01" and a // segment interval like "20001/20002".) - return handle - .createStatement( - StringUtils.format( - "UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated " - + "WHERE dataSource = :dataSource AND used = true AND %s", - dbTables.getSegmentsTable(), - IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end") - ) + final StringBuilder sb = new StringBuilder(); + sb.append( + StringUtils.format( + "UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated " + + "WHERE dataSource = :dataSource AND used = true AND %s", + dbTables.getSegmentsTable(), + IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end") ) + ); + + final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); + + if (hasVersions) { + sb.append(getConditionForVersions(versions)); + } + + final Update stmt = handle + .createStatement(sb.toString()) .bind("dataSource", dataSource) .bind("used", false) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()) - .bind("used_status_last_updated", DateTimes.nowUtc().toString()) - .execute(); + .bind("used_status_last_updated", DateTimes.nowUtc().toString()); + + if (hasVersions) { + bindVersionsToQuery(stmt, versions); + } + return stmt.execute(); } else { // Retrieve, then drop, since we can't write a WHERE clause directly. final List segments = ImmutableList.copyOf( @@ -358,7 +411,7 @@ public class SqlSegmentsMetadataQuery retrieveSegments( dataSource, Collections.singletonList(interval), - null, + versions, IntervalMode.CONTAINS, true, null, @@ -680,11 +733,10 @@ public class SqlSegmentsMetadataQuery appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector); } - if (!CollectionUtils.isNullOrEmpty(versions)) { - final String versionsStr = versions.stream() - .map(version -> "'" + version + "'") - .collect(Collectors.joining(",")); - sb.append(StringUtils.format(" AND version IN (%s)", versionsStr)); + final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); + + if (hasVersions) { + sb.append(getConditionForVersions(versions)); } // Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null. @@ -734,6 +786,10 @@ public class SqlSegmentsMetadataQuery bindQueryIntervals(sql, intervals); } + if (hasVersions) { + bindVersionsToQuery(sql, versions); + } + return sql; } @@ -834,6 +890,36 @@ public class SqlSegmentsMetadataQuery return numChangedSegments; } + private static String getConditionForVersions(final List versions) + { + if (CollectionUtils.isNullOrEmpty(versions)) { + return ""; + } + + final StringBuilder sb = new StringBuilder(); + + sb.append(" AND version IN ("); + for (int i = 0; i < versions.size(); i++) { + sb.append(StringUtils.format(":version%d", i)); + if (i != versions.size() - 1) { + sb.append(","); + } + } + sb.append(")"); + return sb.toString(); + } + + private static void bindVersionsToQuery(final SQLStatement query, final List versions) + { + if (CollectionUtils.isNullOrEmpty(versions)) { + return; + } + + for (int i = 0; i < versions.size(); i++) { + query.bind(StringUtils.format("version%d", i), versions.get(i)); + } + } + enum IntervalMode { CONTAINS { diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index a633640f0e2..2f4334b36ac 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -206,22 +206,21 @@ public class DataSourcesResource @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(DatasourceResourceFilter.class) public Response markAsUsedNonOvershadowedSegments( - @PathParam("dataSourceName") String dataSourceName, - MarkDataSourceSegmentsPayload payload + @PathParam("dataSourceName") final String dataSourceName, + final SegmentsToUpdateFilter payload ) { if (payload == null || !payload.isValid()) { - log.warn("Invalid request payload: [%s]", payload); return Response .status(Response.Status.BAD_REQUEST) - .entity("Invalid request payload, either interval or segmentIds array must be specified") + .entity(SegmentsToUpdateFilter.INVALID_PAYLOAD_ERROR_MESSAGE) .build(); } else { SegmentUpdateOperation operation = () -> { - final Interval interval = payload.getInterval(); + final List versions = payload.getVersions(); if (interval != null) { - return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval); + return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval, versions); } else { final Set segmentIds = payload.getSegmentIds(); if (segmentIds == null || segmentIds.isEmpty()) { @@ -254,22 +253,22 @@ public class DataSourcesResource @Consumes(MediaType.APPLICATION_JSON) public Response markSegmentsAsUnused( @PathParam("dataSourceName") final String dataSourceName, - final MarkDataSourceSegmentsPayload payload, + final SegmentsToUpdateFilter payload, @Context final HttpServletRequest req ) { if (payload == null || !payload.isValid()) { - log.warn("Invalid request payload: [%s]", payload); return Response .status(Response.Status.BAD_REQUEST) - .entity("Invalid request payload, either interval or segmentIds array must be specified") + .entity(SegmentsToUpdateFilter.INVALID_PAYLOAD_ERROR_MESSAGE) .build(); } else { SegmentUpdateOperation operation = () -> { final Interval interval = payload.getInterval(); + final List versions = payload.getVersions(); final int numUpdatedSegments; if (interval != null) { - numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval); + numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval, versions); } else { final Set segmentIds = payload.getSegmentIds() @@ -302,7 +301,7 @@ public class DataSourcesResource private static Response logAndCreateDataSourceNotFoundResponse(String dataSourceName) { - log.warn("datasource not found [%s]", dataSourceName); + log.warn("datasource[%s] not found", dataSourceName); return Response.noContent().build(); } @@ -319,7 +318,7 @@ public class DataSourcesResource .build(); } catch (Exception e) { - log.error(e, "Error occurred while updating segments for data source[%s]", dataSourceName); + log.error(e, "Error occurred while updating segments for datasource[%s]", dataSourceName); return Response .serverError() .entity(ImmutableMap.of("error", "Exception occurred.", "message", Throwables.getRootCause(e).toString())) @@ -567,9 +566,9 @@ public class DataSourcesResource private static class SegmentsLoadStatistics { - private int numPublishedSegments; - private int numUnavailableSegments; - private int numLoadedSegments; + private final int numPublishedSegments; + private final int numUnavailableSegments; + private final int numLoadedSegments; SegmentsLoadStatistics( int numPublishedSegments, @@ -991,39 +990,58 @@ public class DataSourcesResource return false; } + /** + * Either {@code interval} or {@code segmentIds} array must be specified, but not both. + * {@code versions} may be optionally specified only when {@code interval} is provided. + */ @VisibleForTesting - protected static class MarkDataSourceSegmentsPayload + static class SegmentsToUpdateFilter { private final Interval interval; private final Set segmentIds; + private final List versions; + + private static final String INVALID_PAYLOAD_ERROR_MESSAGE = "Invalid request payload. Specify either 'interval' or 'segmentIds', but not both." + + " Optionally, include 'versions' only when 'interval' is provided."; @JsonCreator - public MarkDataSourceSegmentsPayload( - @JsonProperty("interval") Interval interval, - @JsonProperty("segmentIds") Set segmentIds + public SegmentsToUpdateFilter( + @JsonProperty("interval") @Nullable Interval interval, + @JsonProperty("segmentIds") @Nullable Set segmentIds, + @JsonProperty("versions") @Nullable List versions ) { this.interval = interval; this.segmentIds = segmentIds; + this.versions = versions; } + @Nullable @JsonProperty public Interval getInterval() { return interval; } + @Nullable @JsonProperty public Set getSegmentIds() { return segmentIds; } - public boolean isValid() + @Nullable + @JsonProperty + public List getVersions() + { + return versions; + } + + private boolean isValid() { final boolean hasSegmentIds = !CollectionUtils.isNullOrEmpty(segmentIds); if (interval == null) { - return hasSegmentIds; + return hasSegmentIds && CollectionUtils.isNullOrEmpty(versions); } else { return !hasSegmentIds; } diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index b177b40c587..a7128ce2714 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -41,6 +41,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NoneShardSpec; import org.hamcrest.MatcherAssert; +import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; import org.joda.time.Period; @@ -589,6 +590,156 @@ public class SqlSegmentsMetadataManagerTest ); } + @Test + public void testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 2, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.ETERNITY, + ImmutableList.of("2017-10-15T20:19:12.565Z", "2017-10-16T20:19:12.565Z") + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsInFiniteIntervalWithVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 2, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.of("2017-10-15/2017-10-18"), + ImmutableList.of("2017-10-15T20:19:12.565Z", "2017-10-16T20:19:12.565Z") + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonExistentVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.ETERNITY, + ImmutableList.of("foo", "bar") + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + @Test public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws Exception { @@ -683,7 +834,7 @@ public class SqlSegmentsMetadataManagerTest ); // 2 out of 3 segments match the interval - Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval)); + Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval, null)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( @@ -731,7 +882,7 @@ public class SqlSegmentsMetadataManagerTest ); // 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be marked unused - Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval)); + Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval, null)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( @@ -787,7 +938,7 @@ public class SqlSegmentsMetadataManagerTest final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000"); // 2 out of 3 segments match the interval - Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval)); + Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval, null)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( @@ -796,6 +947,104 @@ public class SqlSegmentsMetadataManagerTest ); } + @Test + public void testMarkAsUnusedSegmentsInIntervalAndVersions() throws IOException + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plus(Duration.standardDays(1)).toString(); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", + v1 + ); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + v2 + ); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", + v2 + ); + + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); + final Interval theInterval = Intervals.of("2017-10-15/2017-10-18"); + + Assert.assertEquals( + 2, + sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval( + DS.KOALA, + theInterval, + ImmutableList.of(v1, v2) + ) + ); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment3), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUnusedSegmentsInIntervalAndNonExistentVersions() throws IOException + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plus(Duration.standardDays(1)).toString(); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", + v1 + ); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + v2 + ); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", + v2 + ); + + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); + final Interval theInterval = Intervals.of("2017-10-15/2017-10-18"); + + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval( + DS.KOALA, + theInterval, + ImmutableList.of("foo", "bar", "baz") + ) + ); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + @Test public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws IOException { @@ -822,7 +1071,7 @@ public class SqlSegmentsMetadataManagerTest final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000"); // 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be marked unused - Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval)); + Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval, null)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( diff --git a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java index b195fdad8de..7ec3152ceed 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java +++ b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java @@ -148,7 +148,7 @@ public class TestDerbyConnector extends DerbyConnector } /** - * A wrapper class for queries on the segments table. + * A wrapper class for updating the segments table. */ public static class SegmentsTable { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index adf12ae7054..d255d0abc7d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -87,7 +87,7 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager } @Override - public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval) + public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions) { return 0; } @@ -116,7 +116,7 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager } @Override - public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval) + public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions) { return 0; } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index bd673f078b2..f6bccbeb7da 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -19,6 +19,8 @@ package org.apache.druid.server.http; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -36,6 +38,7 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.SegmentLoadInfo; import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.SegmentsMetadataManager; @@ -734,15 +737,56 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsInterval() { Interval interval = Intervals.of("2010-01-22/P1D"); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull() + ); EasyMock.expect(numUpdatedSegments).andReturn(3).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null) + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null) + ); + Assert.assertEquals(200, response.getStatus()); + EasyMock.verify(segmentsMetadataManager, inventoryView, server); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsIntervalWithVersions() + { + Interval interval = Intervals.of("2010-01-22/P1D"); + + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.eq(ImmutableList.of("v0")) + ); + EasyMock.expect(numUpdatedSegments).andReturn(3).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, server); + + DataSourcesResource dataSourcesResource = createResource(); + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, ImmutableList.of("v0")) + ); + Assert.assertEquals(200, response.getStatus()); + EasyMock.verify(segmentsMetadataManager, inventoryView, server); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsIntervalWithNonExistentVersion() + { + Interval interval = Intervals.of("2010-01-22/P1D"); + + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.eq(ImmutableList.of("foo")) + ); + EasyMock.expect(numUpdatedSegments).andReturn(0).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, server); + + DataSourcesResource dataSourcesResource = createResource(); + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, ImmutableList.of("foo")) ); Assert.assertEquals(200, response.getStatus()); EasyMock.verify(segmentsMetadataManager, inventoryView, server); @@ -752,8 +796,9 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsIntervalNoneUpdated() { Interval interval = Intervals.of("2010-01-22/P1D"); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull() + ); EasyMock.expect(numUpdatedSegments).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); @@ -761,7 +806,7 @@ public class DataSourcesResourceTest Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null) + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null) ); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); EasyMock.verify(segmentsMetadataManager, inventoryView, server); @@ -771,8 +816,9 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsSet() { Set segmentIds = ImmutableSet.of(dataSegmentList.get(1).getId().toString()); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegments(EasyMock.eq("datasource1"), EasyMock.eq(segmentIds)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegments( + EasyMock.eq("datasource1"), EasyMock.eq(segmentIds) + ); EasyMock.expect(numUpdatedSegments).andReturn(3).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); @@ -780,7 +826,7 @@ public class DataSourcesResourceTest Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds) + new DataSourcesResource.SegmentsToUpdateFilter(null, segmentIds, null) ); Assert.assertEquals(200, response.getStatus()); EasyMock.verify(segmentsMetadataManager, inventoryView, server); @@ -790,8 +836,9 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsIntervalException() { Interval interval = Intervals.of("2010-01-22/P1D"); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull() + ); EasyMock.expect(numUpdatedSegments).andThrow(new RuntimeException("Error!")).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); @@ -799,7 +846,7 @@ public class DataSourcesResourceTest Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null) + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null) ); Assert.assertEquals(500, response.getStatus()); EasyMock.verify(segmentsMetadataManager, inventoryView, server); @@ -809,8 +856,9 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource() { Interval interval = Intervals.of("2010-01-22/P1D"); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull() + ); EasyMock.expect(numUpdatedSegments).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); @@ -818,7 +866,7 @@ public class DataSourcesResourceTest Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), null) + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), null, null) ); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -826,13 +874,13 @@ public class DataSourcesResourceTest } @Test - public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndSegmentIds() + public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndSegmentIdsAndVersions() { DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null) + new DataSourcesResource.SegmentsToUpdateFilter(null, null, null) ); Assert.assertEquals(400, response.getStatus()); } @@ -844,20 +892,22 @@ public class DataSourcesResourceTest Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), ImmutableSet.of()) + new DataSourcesResource.SegmentsToUpdateFilter( + Intervals.of("2010-01-22/P1D"), ImmutableSet.of(), null + ) ); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); } @Test - public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndNullSegmentIds() + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullInterval() { DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), null) + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), null, null) ); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -870,32 +920,99 @@ public class DataSourcesResourceTest Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), ImmutableSet.of("segment1")) + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), ImmutableSet.of("segment1"), null) ); Assert.assertEquals(400, response.getStatus()); } @Test - public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndEmptySegmentIds() + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndSegmentIdsAndVersions() { DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, ImmutableSet.of()) + new DataSourcesResource.SegmentsToUpdateFilter( + Intervals.of("2020/2030"), ImmutableSet.of("seg1"), ImmutableList.of("v1", "v2") + ) ); Assert.assertEquals(400, response.getStatus()); } @Test - public void testMarkAsUsedNonOvershadowedSegmentsNoPayload() + public void testMarkAsUsedNonOvershadowedSegmentsWithEmptySegmentIds() { DataSourcesResource dataSourcesResource = createResource(); - Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null); + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(null, ImmutableSet.of(), null) + ); Assert.assertEquals(400, response.getStatus()); } + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithEmptyVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(null, null, ImmutableList.of()) + ); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(null, null, ImmutableList.of("v1", "v2")) + ); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullSegmentIdsAndVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(null, ImmutableSet.of("segment1"), ImmutableList.of("v1", "v2")) + ); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.ETERNITY, null, ImmutableList.of("v1", "v2")) + ); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndEmptyVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.ETERNITY, null, ImmutableList.of()) + ); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); + } + @Test public void testSegmentLoadChecksForVersion() { @@ -1039,12 +1156,13 @@ public class DataSourcesResourceTest EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(1).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload( + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter( null, segmentIds.stream() .map(SegmentId::toString) - .collect(Collectors.toSet()) + .collect(Collectors.toSet()), + null ); DataSourcesResource dataSourcesResource = createResource(); @@ -1068,12 +1186,13 @@ public class DataSourcesResourceTest EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload( + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter( null, segmentIds.stream() .map(SegmentId::toString) - .collect(Collectors.toSet()) + .collect(Collectors.toSet()), + null ); DataSourcesResource dataSourcesResource = createResource(); @@ -1099,16 +1218,16 @@ public class DataSourcesResourceTest .once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload( + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter( null, segmentIds.stream() .map(SegmentId::toString) - .collect(Collectors.toSet()) + .collect(Collectors.toSet()), + null ); - DataSourcesResource dataSourcesResource = - createResource(); + DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1120,11 +1239,11 @@ public class DataSourcesResourceTest { final Interval theInterval = Intervals.of("2010-01-01/P1D"); - EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(1).once(); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)).andReturn(1).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null); DataSourcesResource dataSourcesResource = createResource(); prepareRequestForAudit(); @@ -1140,11 +1259,11 @@ public class DataSourcesResourceTest { final Interval theInterval = Intervals.of("2010-01-01/P1D"); - EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(0).once(); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null); DataSourcesResource dataSourcesResource = createResource(); prepareRequestForAudit(); @@ -1159,16 +1278,15 @@ public class DataSourcesResourceTest { final Interval theInterval = Intervals.of("2010-01-01/P1D"); - EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)) + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)) .andThrow(new RuntimeException("Exception occurred")) .once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null); - DataSourcesResource dataSourcesResource = - createResource(); + DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1179,11 +1297,12 @@ public class DataSourcesResourceTest public void testMarkAsUnusedSegmentsInIntervalNoDataSource() { final Interval theInterval = Intervals.of("2010-01-01/P1D"); - EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(0).once(); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)) + .andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null); DataSourcesResource dataSourcesResource = createResource(); prepareRequestForAudit(); @@ -1193,6 +1312,59 @@ public class DataSourcesResourceTest EasyMock.verify(segmentsMetadataManager); } + @Test + public void testMarkAsUnusedSegmentsInIntervalWithVersions() + { + final Interval theInterval = Intervals.of("2010-01-01/P1D"); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, ImmutableList.of("v1"))) + .andReturn(2).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, server); + + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, ImmutableList.of("v1")); + DataSourcesResource dataSourcesResource = createResource(); + prepareRequestForAudit(); + + Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numChangedSegments", 2), response.getEntity()); + EasyMock.verify(segmentsMetadataManager); + } + + @Test + public void testMarkAsUnusedSegmentsInIntervalWithNonExistentVersion() + { + final Interval theInterval = Intervals.of("2010-01-01/P1D"); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, ImmutableList.of("foo"))) + .andReturn(0).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, server); + + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, ImmutableList.of("foo")); + DataSourcesResource dataSourcesResource = createResource(); + prepareRequestForAudit(); + + Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); + EasyMock.verify(segmentsMetadataManager); + } + + @Test + public void testSegmentsToUpdateFilterSerde() throws JsonProcessingException + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final String payload = "{\"interval\":\"2023-01-01T00:00:00.000Z/2024-01-01T00:00:00.000Z\",\"segmentIds\":null,\"versions\":[\"v1\"]}"; + + final DataSourcesResource.SegmentsToUpdateFilter obj = + mapper.readValue(payload, DataSourcesResource.SegmentsToUpdateFilter.class); + Assert.assertEquals(Intervals.of("2023/2024"), obj.getInterval()); + Assert.assertEquals(ImmutableList.of("v1"), obj.getVersions()); + Assert.assertNull(obj.getSegmentIds()); + + Assert.assertEquals(payload, mapper.writeValueAsString(obj)); + } + @Test public void testMarkSegmentsAsUnusedNullPayload() { @@ -1202,18 +1374,19 @@ public class DataSourcesResourceTest Assert.assertEquals(400, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals( - "Invalid request payload, either interval or segmentIds array must be specified", + "Invalid request payload. Specify either 'interval' or 'segmentIds', but not both." + + " Optionally, include 'versions' only when 'interval' is provided.", response.getEntity() ); } @Test - public void testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIds() + public void testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIdsAndVersions() { DataSourcesResource dataSourcesResource = createResource(); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); Assert.assertEquals(400, response.getStatus()); @@ -1224,10 +1397,9 @@ public class DataSourcesResourceTest public void testMarkSegmentsAsUnusedWithNonNullIntervalAndEmptySegmentIds() { DataSourcesResource dataSourcesResource = createResource(); - - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"), ImmutableSet.of()); prepareRequestForAudit(); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-01/P1D"), ImmutableSet.of(), null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); Assert.assertEquals(200, response.getStatus());