Add versions to `markUsed` and `markUnused` APIs (#16141)

* Mark used and unused APIs by versions.

* remove the conditional invocations.

* isValid() and test updates.

* isValid() and tests.

* Remove warning logs for invalid user requests. Also, downgrade visibility.

* Update resp message, etc.

* tests and some cleanup.

* Docs draft

* Clarify docs

* Update server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* Review comments

* Remove default interface methods only used in tests and update docs.

* Clarify javadocs and @Nullable.

* Add more tests.

* Parameterized versions.

---------

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
This commit is contained in:
Abhishek Radhakrishnan 2024-03-19 09:22:25 -07:00 committed by GitHub
parent 1ad489a2ae
commit fa8e511492
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 708 additions and 152 deletions

View File

@ -206,7 +206,8 @@ Marks the state of a group of segments as unused, using an array of segment IDs
Pass the array of segment IDs or interval as a JSON object in the request body.
For the interval, specify the start and end times as ISO 8601 strings to identify segments inclusive of the start time and exclusive of the end time.
Druid only updates the segments completely contained within the specified interval; partially overlapping segments are not affected.
Optionally, specify an array of segment versions with interval. Druid updates only the segments completely contained
within the specified interval that match the optional list of versions; partially overlapping segments are not affected.
#### URL
@ -214,12 +215,13 @@ Druid only updates the segments completely contained within the specified interv
#### Request body
The group of segments is sent as a JSON request payload that accepts one of the following properties:
The group of segments is sent as a JSON request payload that accepts the following properties:
|Property|Description|Example|
|----------|-------------|---------|
|`interval`|ISO 8601 segments interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`|
|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`|
|Property|Description|Required|Example|
|----------|-------------|---------|---------|
|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`|
|`segmentIds`|List of segment IDs.|Yes, if `interval` is not specified.|`["segmentId1", "segmentId2"]`|
|`versions`|List of segment versions. Must be provided with `interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`|
#### Responses
@ -306,7 +308,8 @@ Marks the state of a group of segments as used, using an array of segment IDs or
Pass the array of segment IDs or interval as a JSON object in the request body.
For the interval, specify the start and end times as ISO 8601 strings to identify segments inclusive of the start time and exclusive of the end time.
Druid only updates the segments completely contained within the specified interval; partially overlapping segments are not affected.
Optionally, specify an array of segment versions with interval. Druid updates only the segments completely contained
within the specified interval that match the optional list of versions; partially overlapping segments are not affected.
#### URL
@ -314,12 +317,13 @@ Druid only updates the segments completely contained within the specified interv
#### Request body
The group of segments is sent as a JSON request payload that accepts one of the following properties:
The group of segments is sent as a JSON request payload that accepts the following properties:
|Property|Description|Example|
|----------|-------------|---------|
|`interval`| ISO 8601 segments interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`|
|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`|
|Property|Description|Required|Example|
|----------|-------------|---------|---------|
|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`|
|`segmentIds`|List of segment IDs.|Yes, if `interval` is not specified.|`["segmentId1", "segmentId2"]`|
|`versions`|List of segment versions. Must be provided with `interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`|
#### Responses

View File

@ -63,9 +63,8 @@ public class MarkSegmentsAsUnusedAction implements TaskAction<Integer>
@Override
public Integer perform(Task task, TaskActionToolbox toolbox)
{
int numMarked = toolbox.getIndexerMetadataStorageCoordinator()
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
return numMarked;
return toolbox.getIndexerMetadataStorageCoordinator()
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
}
@Override

View File

@ -387,7 +387,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
segments.size(),
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
Intervals.of("2018-01-01/2020-01-01")
Intervals.of("2018-01-01/2020-01-01"),
null
)
);
@ -434,7 +435,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
segment1.getInterval()
segment1.getInterval(),
null
)
);
@ -442,7 +444,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
segment4.getInterval()
segment4.getInterval(),
null
)
);
@ -450,7 +453,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
segment3.getInterval()
segment3.getInterval(),
null
)
);
@ -508,7 +512,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
segment1.getInterval()
segment1.getInterval(),
null
)
);
@ -516,7 +521,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
segment4.getInterval()
segment4.getInterval(),
null
)
);
@ -529,7 +535,8 @@ public class KillUnusedSegmentsTaskTest extends IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
segment3.getInterval()
segment3.getInterval(),
null
)
);

View File

@ -53,7 +53,13 @@ public interface SegmentsMetadataManager
*/
int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource);
int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval);
/**
* Marks non-overshadowed unused segments for the given interval and optional list of versions
* as used. If versions are not specified, all versions of non-overshadowed unused segments in the interval
* will be marked as used.
* @return Number of segments updated
*/
int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions);
/**
* Marks the given segment IDs as "used" only if there are not already overshadowed
@ -81,7 +87,13 @@ public interface SegmentsMetadataManager
*/
int markAsUnusedAllSegmentsInDataSource(String dataSource);
int markAsUnusedSegmentsInInterval(String dataSource, Interval interval);
/**
* Marks segments as unused that are <b>fully contained</b> in the given interval for an optional list of versions.
* If versions are not specified, all versions of segments in the interval will be marked as unused.
* Segments that are already marked as unused are not updated.
* @return The number of segments updated
*/
int markAsUnusedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions);
int markSegmentsAsUnused(Set<SegmentId> segmentIds);

View File

@ -654,21 +654,25 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
@Override
public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String dataSource)
{
return doMarkAsUsedNonOvershadowedSegments(dataSource, null);
return doMarkAsUsedNonOvershadowedSegments(dataSource, null, null);
}
@Override
public int markAsUsedNonOvershadowedSegmentsInInterval(final String dataSource, final Interval interval)
public int markAsUsedNonOvershadowedSegmentsInInterval(
final String dataSource,
final Interval interval,
@Nullable final List<String> versions
)
{
Preconditions.checkNotNull(interval);
return doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
return doMarkAsUsedNonOvershadowedSegments(dataSource, interval, versions);
}
/**
* Implementation for both {@link #markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is null)
* and {@link #markAsUsedNonOvershadowedSegmentsInInterval}.
*/
private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval)
private int doMarkAsUsedNonOvershadowedSegments(
final String dataSourceName,
final @Nullable Interval interval,
final @Nullable List<String> versions
)
{
final List<DataSegment> unusedSegments = new ArrayList<>();
final SegmentTimeline timeline = new SegmentTimeline();
@ -682,12 +686,12 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval);
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUsedSegments(dataSourceName, intervals)) {
queryTool.retrieveUsedSegments(dataSourceName, intervals, versions)) {
timeline.addSegments(iterator);
}
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null, null, null)) {
queryTool.retrieveUnusedSegments(dataSourceName, intervals, versions, null, null, null, null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
@ -796,7 +800,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
private int markSegmentsAsUsed(final List<SegmentId> segmentIds)
{
if (segmentIds.isEmpty()) {
log.info("No segments found to update!");
log.info("No segments found to mark as used.");
return 0;
}
@ -856,13 +860,18 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
}
@Override
public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval interval)
public int markAsUnusedSegmentsInInterval(
final String dataSource,
final Interval interval,
@Nullable final List<String> versions
)
{
Preconditions.checkNotNull(interval);
try {
return connector.getDBI().withHandle(
handle ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper)
.markSegmentsUnused(dataSourceName, interval)
.markSegmentsUnused(dataSource, interval, versions)
);
}
catch (Exception e) {

View File

@ -42,6 +42,8 @@ import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.SQLStatement;
import org.skife.jdbi.v2.Update;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -119,11 +121,24 @@ public class SqlSegmentsMetadataQuery
final String dataSource,
final Collection<Interval> intervals
)
{
return retrieveUsedSegments(dataSource, intervals, null);
}
/**
* Similar to {@link #retrieveUsedSegments}, but with an additional {@code versions} argument. When {@code versions}
* is specified, all used segments in the specified {@code intervals} and {@code versions} are retrieved.
*/
public CloseableIterator<DataSegment> retrieveUsedSegments(
final String dataSource,
final Collection<Interval> intervals,
final List<String> versions
)
{
return retrieveSegments(
dataSource,
intervals,
null,
versions,
IntervalMode.OVERLAPS,
true,
null,
@ -134,7 +149,7 @@ public class SqlSegmentsMetadataQuery
}
/**
* Retrieves segments for a given datasource that are marked unused and that are *fully contained by* any interval
* Retrieves segments for a given datasource that are marked unused and that are <b>fully contained by</b> any interval
* in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all
* unused segments.
* <p>
@ -184,7 +199,7 @@ public class SqlSegmentsMetadataQuery
/**
* Similar to {@link #retrieveUnusedSegments}, but also retrieves associated metadata for the segments for a given
* datasource that are marked unused and that are *fully contained by* any interval in a particular collection of
* datasource that are marked unused and that are <b>fully contained by</b> any interval in a particular collection of
* intervals. If the collection of intervals is empty, this method will retrieve all unused segments.
*
* This call does not return any information about realtime segments.
@ -312,45 +327,83 @@ public class SqlSegmentsMetadataQuery
}
/**
* Marks all used segments that are *fully contained by* a particular interval as unused.
* Marks all used segments that are <b>fully contained by</b> a particular interval as unused.
*
* @return the number of segments actually modified.
* @return Number of segments updated.
*/
public int markSegmentsUnused(final String dataSource, final Interval interval)
{
return markSegmentsUnused(dataSource, interval, null);
}
/**
* Marks all used segments that are <b>fully contained by</b> a particular interval filtered by an optional list of versions
* as unused.
*
* @return Number of segments updated.
*/
public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List<String> versions)
{
if (Intervals.isEternity(interval)) {
return handle
.createStatement(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true",
dbTables.getSegmentsTable()
)
final StringBuilder sb = new StringBuilder();
sb.append(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true",
dbTables.getSegmentsTable()
)
);
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
if (hasVersions) {
sb.append(getConditionForVersions(versions));
}
final Update stmt = handle
.createStatement(sb.toString())
.bind("dataSource", dataSource)
.bind("used", false)
.bind("used_status_last_updated", DateTimes.nowUtc().toString())
.execute();
.bind("used_status_last_updated", DateTimes.nowUtc().toString());
if (hasVersions) {
bindVersionsToQuery(stmt, versions);
}
return stmt.execute();
} else if (Intervals.canCompareEndpointsAsStrings(interval)
&& interval.getStart().getYear() == interval.getEnd().getYear()) {
// Safe to write a WHERE clause with this interval. Note that it is unsafe if the years are different, because
// that means extra characters can sneak in. (Consider a query interval like "2000-01-01/2001-01-01" and a
// segment interval like "20001/20002".)
return handle
.createStatement(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true AND %s",
dbTables.getSegmentsTable(),
IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end")
)
final StringBuilder sb = new StringBuilder();
sb.append(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true AND %s",
dbTables.getSegmentsTable(),
IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end")
)
);
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
if (hasVersions) {
sb.append(getConditionForVersions(versions));
}
final Update stmt = handle
.createStatement(sb.toString())
.bind("dataSource", dataSource)
.bind("used", false)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("used_status_last_updated", DateTimes.nowUtc().toString())
.execute();
.bind("used_status_last_updated", DateTimes.nowUtc().toString());
if (hasVersions) {
bindVersionsToQuery(stmt, versions);
}
return stmt.execute();
} else {
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
@ -358,7 +411,7 @@ public class SqlSegmentsMetadataQuery
retrieveSegments(
dataSource,
Collections.singletonList(interval),
null,
versions,
IntervalMode.CONTAINS,
true,
null,
@ -680,11 +733,10 @@ public class SqlSegmentsMetadataQuery
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector);
}
if (!CollectionUtils.isNullOrEmpty(versions)) {
final String versionsStr = versions.stream()
.map(version -> "'" + version + "'")
.collect(Collectors.joining(","));
sb.append(StringUtils.format(" AND version IN (%s)", versionsStr));
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
if (hasVersions) {
sb.append(getConditionForVersions(versions));
}
// Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null.
@ -734,6 +786,10 @@ public class SqlSegmentsMetadataQuery
bindQueryIntervals(sql, intervals);
}
if (hasVersions) {
bindVersionsToQuery(sql, versions);
}
return sql;
}
@ -834,6 +890,36 @@ public class SqlSegmentsMetadataQuery
return numChangedSegments;
}
private static String getConditionForVersions(final List<String> versions)
{
if (CollectionUtils.isNullOrEmpty(versions)) {
return "";
}
final StringBuilder sb = new StringBuilder();
sb.append(" AND version IN (");
for (int i = 0; i < versions.size(); i++) {
sb.append(StringUtils.format(":version%d", i));
if (i != versions.size() - 1) {
sb.append(",");
}
}
sb.append(")");
return sb.toString();
}
private static void bindVersionsToQuery(final SQLStatement query, final List<String> versions)
{
if (CollectionUtils.isNullOrEmpty(versions)) {
return;
}
for (int i = 0; i < versions.size(); i++) {
query.bind(StringUtils.format("version%d", i), versions.get(i));
}
}
enum IntervalMode
{
CONTAINS {

View File

@ -206,22 +206,21 @@ public class DataSourcesResource
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response markAsUsedNonOvershadowedSegments(
@PathParam("dataSourceName") String dataSourceName,
MarkDataSourceSegmentsPayload payload
@PathParam("dataSourceName") final String dataSourceName,
final SegmentsToUpdateFilter payload
)
{
if (payload == null || !payload.isValid()) {
log.warn("Invalid request payload: [%s]", payload);
return Response
.status(Response.Status.BAD_REQUEST)
.entity("Invalid request payload, either interval or segmentIds array must be specified")
.entity(SegmentsToUpdateFilter.INVALID_PAYLOAD_ERROR_MESSAGE)
.build();
} else {
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
final List<String> versions = payload.getVersions();
if (interval != null) {
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval);
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval, versions);
} else {
final Set<String> segmentIds = payload.getSegmentIds();
if (segmentIds == null || segmentIds.isEmpty()) {
@ -254,22 +253,22 @@ public class DataSourcesResource
@Consumes(MediaType.APPLICATION_JSON)
public Response markSegmentsAsUnused(
@PathParam("dataSourceName") final String dataSourceName,
final MarkDataSourceSegmentsPayload payload,
final SegmentsToUpdateFilter payload,
@Context final HttpServletRequest req
)
{
if (payload == null || !payload.isValid()) {
log.warn("Invalid request payload: [%s]", payload);
return Response
.status(Response.Status.BAD_REQUEST)
.entity("Invalid request payload, either interval or segmentIds array must be specified")
.entity(SegmentsToUpdateFilter.INVALID_PAYLOAD_ERROR_MESSAGE)
.build();
} else {
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
final List<String> versions = payload.getVersions();
final int numUpdatedSegments;
if (interval != null) {
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval);
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval, versions);
} else {
final Set<SegmentId> segmentIds =
payload.getSegmentIds()
@ -302,7 +301,7 @@ public class DataSourcesResource
private static Response logAndCreateDataSourceNotFoundResponse(String dataSourceName)
{
log.warn("datasource not found [%s]", dataSourceName);
log.warn("datasource[%s] not found", dataSourceName);
return Response.noContent().build();
}
@ -319,7 +318,7 @@ public class DataSourcesResource
.build();
}
catch (Exception e) {
log.error(e, "Error occurred while updating segments for data source[%s]", dataSourceName);
log.error(e, "Error occurred while updating segments for datasource[%s]", dataSourceName);
return Response
.serverError()
.entity(ImmutableMap.of("error", "Exception occurred.", "message", Throwables.getRootCause(e).toString()))
@ -567,9 +566,9 @@ public class DataSourcesResource
private static class SegmentsLoadStatistics
{
private int numPublishedSegments;
private int numUnavailableSegments;
private int numLoadedSegments;
private final int numPublishedSegments;
private final int numUnavailableSegments;
private final int numLoadedSegments;
SegmentsLoadStatistics(
int numPublishedSegments,
@ -991,39 +990,58 @@ public class DataSourcesResource
return false;
}
/**
* Either {@code interval} or {@code segmentIds} array must be specified, but not both.
* {@code versions} may be optionally specified only when {@code interval} is provided.
*/
@VisibleForTesting
protected static class MarkDataSourceSegmentsPayload
static class SegmentsToUpdateFilter
{
private final Interval interval;
private final Set<String> segmentIds;
private final List<String> versions;
private static final String INVALID_PAYLOAD_ERROR_MESSAGE = "Invalid request payload. Specify either 'interval' or 'segmentIds', but not both."
+ " Optionally, include 'versions' only when 'interval' is provided.";
@JsonCreator
public MarkDataSourceSegmentsPayload(
@JsonProperty("interval") Interval interval,
@JsonProperty("segmentIds") Set<String> segmentIds
public SegmentsToUpdateFilter(
@JsonProperty("interval") @Nullable Interval interval,
@JsonProperty("segmentIds") @Nullable Set<String> segmentIds,
@JsonProperty("versions") @Nullable List<String> versions
)
{
this.interval = interval;
this.segmentIds = segmentIds;
this.versions = versions;
}
@Nullable
@JsonProperty
public Interval getInterval()
{
return interval;
}
@Nullable
@JsonProperty
public Set<String> getSegmentIds()
{
return segmentIds;
}
public boolean isValid()
@Nullable
@JsonProperty
public List<String> getVersions()
{
return versions;
}
private boolean isValid()
{
final boolean hasSegmentIds = !CollectionUtils.isNullOrEmpty(segmentIds);
if (interval == null) {
return hasSegmentIds;
return hasSegmentIds && CollectionUtils.isNullOrEmpty(versions);
} else {
return !hasSegmentIds;
}

View File

@ -41,6 +41,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -589,6 +590,156 @@ public class SqlSegmentsMetadataManagerTest
);
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithVersions() throws Exception
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-16T20:19:12.565Z"
);
// Overshadowed by koalaSegment2
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
Assert.assertEquals(
2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
DS.KOALA,
Intervals.ETERNITY,
ImmutableList.of("2017-10-15T20:19:12.565Z", "2017-10-16T20:19:12.565Z")
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsInFiniteIntervalWithVersions() throws Exception
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-16T20:19:12.565Z"
);
// Overshadowed by koalaSegment2
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
Assert.assertEquals(
2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
DS.KOALA,
Intervals.of("2017-10-15/2017-10-18"),
ImmutableList.of("2017-10-15T20:19:12.565Z", "2017-10-16T20:19:12.565Z")
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithNonExistentVersions() throws Exception
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-16T20:19:12.565Z"
);
// Overshadowed by koalaSegment2
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
"2017-10-15T20:19:12.565Z"
);
publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
Assert.assertEquals(
0,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
DS.KOALA,
Intervals.ETERNITY,
ImmutableList.of("foo", "bar")
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws Exception
{
@ -683,7 +834,7 @@ public class SqlSegmentsMetadataManagerTest
);
// 2 out of 3 segments match the interval
Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval));
Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval, null));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
@ -731,7 +882,7 @@ public class SqlSegmentsMetadataManagerTest
);
// 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be marked unused
Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval));
Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval, null));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
@ -787,7 +938,7 @@ public class SqlSegmentsMetadataManagerTest
final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
// 2 out of 3 segments match the interval
Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval));
Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval, null));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
@ -796,6 +947,104 @@ public class SqlSegmentsMetadataManagerTest
);
}
@Test
public void testMarkAsUnusedSegmentsInIntervalAndVersions() throws IOException
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DateTime now = DateTimes.nowUtc();
final String v1 = now.toString();
final String v2 = now.plus(Duration.standardDays(1)).toString();
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
v1
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
v2
);
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
v2
);
publisher.publishSegment(koalaSegment1);
publisher.publishSegment(koalaSegment2);
publisher.publishSegment(koalaSegment3);
final Interval theInterval = Intervals.of("2017-10-15/2017-10-18");
Assert.assertEquals(
2,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(
DS.KOALA,
theInterval,
ImmutableList.of(v1, v2)
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment3),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test
public void testMarkAsUnusedSegmentsInIntervalAndNonExistentVersions() throws IOException
{
publishWikiSegments();
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
final DateTime now = DateTimes.nowUtc();
final String v1 = now.toString();
final String v2 = now.plus(Duration.standardDays(1)).toString();
final DataSegment koalaSegment1 = createSegment(
DS.KOALA,
"2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
v1
);
final DataSegment koalaSegment2 = createSegment(
DS.KOALA,
"2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
v2
);
final DataSegment koalaSegment3 = createSegment(
DS.KOALA,
"2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
v2
);
publisher.publishSegment(koalaSegment1);
publisher.publishSegment(koalaSegment2);
publisher.publishSegment(koalaSegment3);
final Interval theInterval = Intervals.of("2017-10-15/2017-10-18");
Assert.assertEquals(
0,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(
DS.KOALA,
theInterval,
ImmutableList.of("foo", "bar", "baz")
)
);
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
);
}
@Test
public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws IOException
{
@ -822,7 +1071,7 @@ public class SqlSegmentsMetadataManagerTest
final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
// 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be marked unused
Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval));
Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval, null));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(

View File

@ -148,7 +148,7 @@ public class TestDerbyConnector extends DerbyConnector
}
/**
* A wrapper class for queries on the segments table.
* A wrapper class for updating the segments table.
*/
public static class SegmentsTable
{

View File

@ -87,7 +87,7 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
}
@Override
public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval)
public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions)
{
return 0;
}
@ -116,7 +116,7 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
}
@Override
public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval)
public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions)
{
return 0;
}

View File

@ -19,6 +19,8 @@
package org.apache.druid.server.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -36,6 +38,7 @@ import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.SegmentLoadInfo;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
@ -734,15 +737,56 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsInterval()
{
Interval interval = Intervals.of("2010-01-22/P1D");
int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval));
int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull()
);
EasyMock.expect(numUpdatedSegments).andReturn(3).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null)
new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null)
);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsIntervalWithVersions()
{
Interval interval = Intervals.of("2010-01-22/P1D");
int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.eq(ImmutableList.of("v0"))
);
EasyMock.expect(numUpdatedSegments).andReturn(3).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.SegmentsToUpdateFilter(interval, null, ImmutableList.of("v0"))
);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsIntervalWithNonExistentVersion()
{
Interval interval = Intervals.of("2010-01-22/P1D");
int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.eq(ImmutableList.of("foo"))
);
EasyMock.expect(numUpdatedSegments).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.SegmentsToUpdateFilter(interval, null, ImmutableList.of("foo"))
);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
@ -752,8 +796,9 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsIntervalNoneUpdated()
{
Interval interval = Intervals.of("2010-01-22/P1D");
int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval));
int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull()
);
EasyMock.expect(numUpdatedSegments).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
@ -761,7 +806,7 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null)
new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null)
);
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
@ -771,8 +816,9 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsSet()
{
Set<String> segmentIds = ImmutableSet.of(dataSegmentList.get(1).getId().toString());
int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegments(EasyMock.eq("datasource1"), EasyMock.eq(segmentIds));
int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegments(
EasyMock.eq("datasource1"), EasyMock.eq(segmentIds)
);
EasyMock.expect(numUpdatedSegments).andReturn(3).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
@ -780,7 +826,7 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds)
new DataSourcesResource.SegmentsToUpdateFilter(null, segmentIds, null)
);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
@ -790,8 +836,9 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsIntervalException()
{
Interval interval = Intervals.of("2010-01-22/P1D");
int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval));
int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull()
);
EasyMock.expect(numUpdatedSegments).andThrow(new RuntimeException("Error!")).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
@ -799,7 +846,7 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null)
new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null)
);
Assert.assertEquals(500, response.getStatus());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
@ -809,8 +856,9 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource()
{
Interval interval = Intervals.of("2010-01-22/P1D");
int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval));
int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull()
);
EasyMock.expect(numUpdatedSegments).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
@ -818,7 +866,7 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), null)
new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), null, null)
);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
@ -826,13 +874,13 @@ public class DataSourcesResourceTest
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndSegmentIds()
public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndSegmentIdsAndVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null)
new DataSourcesResource.SegmentsToUpdateFilter(null, null, null)
);
Assert.assertEquals(400, response.getStatus());
}
@ -844,20 +892,22 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), ImmutableSet.of())
new DataSourcesResource.SegmentsToUpdateFilter(
Intervals.of("2010-01-22/P1D"), ImmutableSet.of(), null
)
);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndNullSegmentIds()
public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullInterval()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), null)
new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), null, null)
);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
@ -870,32 +920,99 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), ImmutableSet.of("segment1"))
new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), ImmutableSet.of("segment1"), null)
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndEmptySegmentIds()
public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndSegmentIdsAndVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, ImmutableSet.of())
new DataSourcesResource.SegmentsToUpdateFilter(
Intervals.of("2020/2030"), ImmutableSet.of("seg1"), ImmutableList.of("v1", "v2")
)
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsNoPayload()
public void testMarkAsUsedNonOvershadowedSegmentsWithEmptySegmentIds()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.SegmentsToUpdateFilter(null, ImmutableSet.of(), null)
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithEmptyVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.SegmentsToUpdateFilter(null, null, ImmutableList.of())
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.SegmentsToUpdateFilter(null, null, ImmutableList.of("v1", "v2"))
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullSegmentIdsAndVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.SegmentsToUpdateFilter(null, ImmutableSet.of("segment1"), ImmutableList.of("v1", "v2"))
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.SegmentsToUpdateFilter(Intervals.ETERNITY, null, ImmutableList.of("v1", "v2"))
);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
}
@Test
public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndEmptyVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
new DataSourcesResource.SegmentsToUpdateFilter(Intervals.ETERNITY, null, ImmutableList.of())
);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
}
@Test
public void testSegmentLoadChecksForVersion()
{
@ -1039,12 +1156,13 @@ public class DataSourcesResourceTest
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(1).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(
null,
segmentIds.stream()
.map(SegmentId::toString)
.collect(Collectors.toSet())
.collect(Collectors.toSet()),
null
);
DataSourcesResource dataSourcesResource = createResource();
@ -1068,12 +1186,13 @@ public class DataSourcesResourceTest
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(
null,
segmentIds.stream()
.map(SegmentId::toString)
.collect(Collectors.toSet())
.collect(Collectors.toSet()),
null
);
DataSourcesResource dataSourcesResource = createResource();
@ -1099,16 +1218,16 @@ public class DataSourcesResourceTest
.once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(
null,
segmentIds.stream()
.map(SegmentId::toString)
.collect(Collectors.toSet())
.collect(Collectors.toSet()),
null
);
DataSourcesResource dataSourcesResource =
createResource();
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(500, response.getStatus());
Assert.assertNotNull(response.getEntity());
@ -1120,11 +1239,11 @@ public class DataSourcesResourceTest
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(1).once();
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)).andReturn(1).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null);
DataSourcesResource dataSourcesResource = createResource();
prepareRequestForAudit();
@ -1140,11 +1259,11 @@ public class DataSourcesResourceTest
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(0).once();
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null);
DataSourcesResource dataSourcesResource = createResource();
prepareRequestForAudit();
@ -1159,16 +1278,15 @@ public class DataSourcesResourceTest
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval))
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null))
.andThrow(new RuntimeException("Exception occurred"))
.once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null);
DataSourcesResource dataSourcesResource =
createResource();
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(500, response.getStatus());
Assert.assertNotNull(response.getEntity());
@ -1179,11 +1297,12 @@ public class DataSourcesResourceTest
public void testMarkAsUnusedSegmentsInIntervalNoDataSource()
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(0).once();
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null))
.andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null);
DataSourcesResource dataSourcesResource = createResource();
prepareRequestForAudit();
@ -1193,6 +1312,59 @@ public class DataSourcesResourceTest
EasyMock.verify(segmentsMetadataManager);
}
@Test
public void testMarkAsUnusedSegmentsInIntervalWithVersions()
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, ImmutableList.of("v1")))
.andReturn(2).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, ImmutableList.of("v1"));
DataSourcesResource dataSourcesResource = createResource();
prepareRequestForAudit();
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 2), response.getEntity());
EasyMock.verify(segmentsMetadataManager);
}
@Test
public void testMarkAsUnusedSegmentsInIntervalWithNonExistentVersion()
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, ImmutableList.of("foo")))
.andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, ImmutableList.of("foo"));
DataSourcesResource dataSourcesResource = createResource();
prepareRequestForAudit();
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
EasyMock.verify(segmentsMetadataManager);
}
@Test
public void testSegmentsToUpdateFilterSerde() throws JsonProcessingException
{
final ObjectMapper mapper = new DefaultObjectMapper();
final String payload = "{\"interval\":\"2023-01-01T00:00:00.000Z/2024-01-01T00:00:00.000Z\",\"segmentIds\":null,\"versions\":[\"v1\"]}";
final DataSourcesResource.SegmentsToUpdateFilter obj =
mapper.readValue(payload, DataSourcesResource.SegmentsToUpdateFilter.class);
Assert.assertEquals(Intervals.of("2023/2024"), obj.getInterval());
Assert.assertEquals(ImmutableList.of("v1"), obj.getVersions());
Assert.assertNull(obj.getSegmentIds());
Assert.assertEquals(payload, mapper.writeValueAsString(obj));
}
@Test
public void testMarkSegmentsAsUnusedNullPayload()
{
@ -1202,18 +1374,19 @@ public class DataSourcesResourceTest
Assert.assertEquals(400, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(
"Invalid request payload, either interval or segmentIds array must be specified",
"Invalid request payload. Specify either 'interval' or 'segmentIds', but not both."
+ " Optionally, include 'versions' only when 'interval' is provided.",
response.getEntity()
);
}
@Test
public void testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIds()
public void testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIdsAndVersions()
{
DataSourcesResource dataSourcesResource = createResource();
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null);
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(null, null, null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(400, response.getStatus());
@ -1224,10 +1397,9 @@ public class DataSourcesResourceTest
public void testMarkSegmentsAsUnusedWithNonNullIntervalAndEmptySegmentIds()
{
DataSourcesResource dataSourcesResource = createResource();
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"), ImmutableSet.of());
prepareRequestForAudit();
final DataSourcesResource.SegmentsToUpdateFilter payload =
new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-01/P1D"), ImmutableSet.of(), null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(200, response.getStatus());