diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 3c28b3aa4a9..649900c841f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator.duty; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; @@ -36,17 +37,20 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.metadata.SQLMetadataSegmentPublisher; +import org.apache.druid.metadata.SegmentsMetadataManagerConfig; +import org.apache.druid.metadata.SqlSegmentsMetadataManager; +import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; -import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; -import org.apache.druid.server.coordinator.simulate.TestSegmentsMetadataManager; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; -import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.hamcrest.MatcherAssert; @@ -56,14 +60,16 @@ import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; public class KillUnusedSegmentsTest @@ -87,17 +93,38 @@ public class KillUnusedSegmentsTest private static final String VERSION = "v1"; private final CoordinatorDynamicConfig.Builder dynamicConfigBuilder = CoordinatorDynamicConfig.builder(); - private TestSegmentsMetadataManager segmentsMetadataManager; private TestOverlordClient overlordClient; private TestDruidCoordinatorConfig.Builder configBuilder; private DruidCoordinatorRuntimeParams.Builder paramsBuilder; private KillUnusedSegments killDuty; + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private SqlSegmentsMetadataManager sqlSegmentsMetadataManager; + private SQLMetadataSegmentPublisher publisher; + @Before public void setup() { - segmentsMetadataManager = new TestSegmentsMetadataManager(); + final TestDerbyConnector connector = derbyConnectorRule.getConnector(); + SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(); + config.setPollDuration(Period.millis(1)); + sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( + TestHelper.makeJsonMapper(), + Suppliers.ofInstance(config), + derbyConnectorRule.metadataTablesConfigSupplier(), + connector + ); + sqlSegmentsMetadataManager.start(); + + publisher = new SQLMetadataSegmentPublisher( + TestHelper.makeJsonMapper(), + derbyConnectorRule.metadataTablesConfigSupplier().get(), + connector + ); + connector.createSegmentTable(); + overlordClient = new TestOverlordClient(); configBuilder = new TestDruidCoordinatorConfig.Builder() .withCoordinatorIndexingPeriod(Duration.standardSeconds(0)) @@ -129,9 +156,9 @@ public class KillUnusedSegmentsTest Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS)); Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS)); Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS)); - Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); - validateLastKillStateAndReset(DS1, YEAR_OLD); + validateLastKillStateAndReset(DS1, Intervals.ETERNITY); } @Test @@ -584,17 +611,6 @@ public class KillUnusedSegmentsTest validateLastKillStateAndReset(DS1, firstHalfEternity); } - /** - *

- * Regardless of {@link DruidCoordinatorConfig#getCoordinatorKillIgnoreDurationToRetain()} configuration, - * auto-kill doesn't delete unused segments that end at {@link DateTimes#MAX}. - * This is because the kill duty uses {@link DateTimes#COMPARE_DATE_AS_STRING_MAX} as the - * datetime string comparison for the end endpoint when retrieving unused segment intervals. - *

- * For more information, see Issue#15951. - *

- */ - @Ignore @Test public void testKillEternitySegment() { @@ -613,18 +629,6 @@ public class KillUnusedSegmentsTest validateLastKillStateAndReset(DS1, Intervals.ETERNITY); } - /** - * Similar to {@link #testKillEternitySegment()} - *

- * Regardless of {@link DruidCoordinatorConfig#getCoordinatorKillIgnoreDurationToRetain()} configuration, - * auto-kill doesn't delete unused segments that end at {@link DateTimes#MAX}. - * This is because the kill duty uses {@link DateTimes#COMPARE_DATE_AS_STRING_MAX} as the - * datetime string comparison for the end endpoint when retrieving unused segment intervals. - *

- * For more information, see Issue#15951. - *

- */ - @Ignore @Test public void testKillSecondHalfEternitySegment() { @@ -644,6 +648,26 @@ public class KillUnusedSegmentsTest validateLastKillStateAndReset(DS1, secondHalfEternity); } + @Test + public void testKillLargeIntervalSegments() + { + final Interval largeTimeRange1 = Intervals.of("1990-01-01T00Z/19940-01-01T00Z"); + final Interval largeTimeRange2 = Intervals.of("-19940-01-01T00Z/1970-01-01T00Z"); + + createAndAddUnusedSegment(DS1, largeTimeRange1, VERSION, NOW.minusDays(60)); + createAndAddUnusedSegment(DS1, largeTimeRange2, VERSION, NOW.minusDays(60)); + + initDuty(); + final CoordinatorRunStats stats = runDutyAndGetStats(); + + Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS)); + Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS)); + Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS)); + Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + + validateLastKillStateAndReset(DS1, new Interval(largeTimeRange2.getStart(), largeTimeRange1.getEnd())); + } + @Test public void testKillMultipleSegmentsInSameInterval() { @@ -671,7 +695,7 @@ public class KillUnusedSegmentsTest Assert.assertThrows( DruidException.class, () -> new KillUnusedSegments( - segmentsMetadataManager, + sqlSegmentsMetadataManager, overlordClient, new TestDruidCoordinatorConfig.Builder() .withCoordinatorIndexingPeriod(Duration.standardSeconds(10)) @@ -697,7 +721,7 @@ public class KillUnusedSegmentsTest Assert.assertThrows( DruidException.class, () -> new KillUnusedSegments( - segmentsMetadataManager, + sqlSegmentsMetadataManager, overlordClient, new TestDruidCoordinatorConfig.Builder() .withCoordinatorKillMaxSegments(-5) @@ -745,12 +769,14 @@ public class KillUnusedSegmentsTest ) { final DataSegment segment = createSegment(dataSource, interval, version); - final DataSegmentPlus unusedSegmentPlus = new DataSegmentPlus( - segment, - DateTimes.nowUtc(), - lastUpdatedTime - ); - segmentsMetadataManager.addUnusedSegment(unusedSegmentPlus); + try { + publisher.publishSegment(segment); + } + catch (IOException e) { + throw new RuntimeException(e); + } + sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(segment.getId())); + updateUsedStatusLastUpdated(segment, lastUpdatedTime); } private DataSegment createSegment(final String dataSource, final Interval interval, final String version) @@ -771,7 +797,7 @@ public class KillUnusedSegmentsTest private void initDuty() { - killDuty = new KillUnusedSegments(segmentsMetadataManager, overlordClient, configBuilder.build()); + killDuty = new KillUnusedSegments(sqlSegmentsMetadataManager, overlordClient, configBuilder.build()); } private CoordinatorRunStats runDutyAndGetStats() @@ -898,4 +924,25 @@ public class KillUnusedSegmentsTest observedDatasourceToLastKillTaskId.remove(dataSource); } } + + private void updateUsedStatusLastUpdated(DataSegment segment, DateTime lastUpdatedTime) + { + derbyConnectorRule.getConnector().retryWithHandle( + handle -> handle.update( + StringUtils.format( + "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?", getSegmentsTable() + ), + lastUpdatedTime.toString(), + segment.getId().toString() + ) + ); + } + + private String getSegmentsTable() + { + return derbyConnectorRule.metadataTablesConfigSupplier() + .get() + .getSegmentsTable() + .toUpperCase(Locale.ENGLISH); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index 24d74aabc6d..adf12ae7054 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -23,7 +23,6 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.metadata.SortOrder; import org.apache.druid.server.http.DataSegmentPlus; @@ -35,9 +34,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -48,7 +45,6 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager { private final ConcurrentMap allSegments = new ConcurrentHashMap<>(); private final ConcurrentMap usedSegments = new ConcurrentHashMap<>(); - private final ConcurrentMap unusedSegments = new ConcurrentHashMap<>(); private volatile DataSourcesSnapshot snapshot; @@ -66,13 +62,6 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager snapshot = null; } - public void addUnusedSegment(DataSegmentPlus segment) - { - unusedSegments.put(segment.getDataSegment().getId().toString(), segment); - allSegments.put(segment.getDataSegment().getId().toString(), segment.getDataSegment()); - snapshot = null; - } - @Override public void startPollingDatabasePeriodically() { @@ -136,12 +125,9 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager public int markSegmentsAsUnused(Set segmentIds) { int numModifiedSegments = 0; - final DateTime now = DateTimes.nowUtc(); for (SegmentId segmentId : segmentIds) { if (allSegments.containsKey(segmentId.toString())) { - DataSegment dataSegment = allSegments.get(segmentId.toString()); - unusedSegments.put(segmentId.toString(), new DataSegmentPlus(dataSegment, now, now)); usedSegments.remove(segmentId.toString()); ++numModifiedSegments; } @@ -238,28 +224,7 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager final DateTime maxUsedStatusLastUpdatedTime ) { - final List sortedUnusedSegmentPluses = new ArrayList<>(unusedSegments.values()); - sortedUnusedSegmentPluses.sort( - Comparator.comparingLong( - dataSegmentPlus -> dataSegmentPlus.getDataSegment().getInterval().getStartMillis() - ) - ); - - final List unusedSegmentIntervals = new ArrayList<>(); - - for (final DataSegmentPlus unusedSegmentPlus : sortedUnusedSegmentPluses) { - final DataSegment unusedSegment = unusedSegmentPlus.getDataSegment(); - if (dataSource.equals(unusedSegment.getDataSource())) { - final Interval interval = unusedSegment.getInterval(); - - if ((minStartTime == null || interval.getStart().isAfter(minStartTime)) && - interval.getEnd().isBefore(maxEndTime) && - unusedSegmentPlus.getUsedStatusLastUpdatedDate().isBefore(maxUsedStatusLastUpdatedTime)) { - unusedSegmentIntervals.add(interval); - } - } - } - return unusedSegmentIntervals.stream().limit(limit).collect(Collectors.toList()); + return null; } @Override