Fixup KillUnusedSegmentsTest (#16094)

Changes:
- Use an actual SqlSegmentsMetadataManager instead of TestSqlSegmentsMetadataManager
- Simplify TestSegmentsMetadataManager
- Add a test for large interval segments.
This commit is contained in:
Abhishek Radhakrishnan 2024-03-11 13:37:48 +05:30 committed by GitHub
parent 148ad32e75
commit c7f1872bd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 88 additions and 76 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.duty; package org.apache.druid.server.coordinator.duty;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -36,17 +37,20 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.SQLMetadataSegmentPublisher;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.apache.druid.server.coordinator.simulate.TestSegmentsMetadataManager;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
@ -56,14 +60,16 @@ import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
public class KillUnusedSegmentsTest public class KillUnusedSegmentsTest
@ -87,17 +93,38 @@ public class KillUnusedSegmentsTest
private static final String VERSION = "v1"; private static final String VERSION = "v1";
private final CoordinatorDynamicConfig.Builder dynamicConfigBuilder = CoordinatorDynamicConfig.builder(); private final CoordinatorDynamicConfig.Builder dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
private TestSegmentsMetadataManager segmentsMetadataManager;
private TestOverlordClient overlordClient; private TestOverlordClient overlordClient;
private TestDruidCoordinatorConfig.Builder configBuilder; private TestDruidCoordinatorConfig.Builder configBuilder;
private DruidCoordinatorRuntimeParams.Builder paramsBuilder; private DruidCoordinatorRuntimeParams.Builder paramsBuilder;
private KillUnusedSegments killDuty; private KillUnusedSegments killDuty;
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private SQLMetadataSegmentPublisher publisher;
@Before @Before
public void setup() public void setup()
{ {
segmentsMetadataManager = new TestSegmentsMetadataManager(); final TestDerbyConnector connector = derbyConnectorRule.getConnector();
SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
config.setPollDuration(Period.millis(1));
sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
TestHelper.makeJsonMapper(),
Suppliers.ofInstance(config),
derbyConnectorRule.metadataTablesConfigSupplier(),
connector
);
sqlSegmentsMetadataManager.start();
publisher = new SQLMetadataSegmentPublisher(
TestHelper.makeJsonMapper(),
derbyConnectorRule.metadataTablesConfigSupplier().get(),
connector
);
connector.createSegmentTable();
overlordClient = new TestOverlordClient(); overlordClient = new TestOverlordClient();
configBuilder = new TestDruidCoordinatorConfig.Builder() configBuilder = new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.standardSeconds(0)) .withCoordinatorIndexingPeriod(Duration.standardSeconds(0))
@ -129,9 +156,9 @@ public class KillUnusedSegmentsTest
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS)); Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS)); Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS)); Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, YEAR_OLD); validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
} }
@Test @Test
@ -584,17 +611,6 @@ public class KillUnusedSegmentsTest
validateLastKillStateAndReset(DS1, firstHalfEternity); validateLastKillStateAndReset(DS1, firstHalfEternity);
} }
/**
* <p>
* Regardless of {@link DruidCoordinatorConfig#getCoordinatorKillIgnoreDurationToRetain()} configuration,
* auto-kill doesn't delete unused segments that end at {@link DateTimes#MAX}.
* This is because the kill duty uses {@link DateTimes#COMPARE_DATE_AS_STRING_MAX} as the
* datetime string comparison for the end endpoint when retrieving unused segment intervals.
* </p><p>
* For more information, see <a href="https://github.com/apache/druid/issues/15951"> Issue#15951</a>.
* </p>
*/
@Ignore
@Test @Test
public void testKillEternitySegment() public void testKillEternitySegment()
{ {
@ -613,18 +629,6 @@ public class KillUnusedSegmentsTest
validateLastKillStateAndReset(DS1, Intervals.ETERNITY); validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
} }
/**
* Similar to {@link #testKillEternitySegment()}
* <p>
* Regardless of {@link DruidCoordinatorConfig#getCoordinatorKillIgnoreDurationToRetain()} configuration,
* auto-kill doesn't delete unused segments that end at {@link DateTimes#MAX}.
* This is because the kill duty uses {@link DateTimes#COMPARE_DATE_AS_STRING_MAX} as the
* datetime string comparison for the end endpoint when retrieving unused segment intervals.
* </p><p>
* For more information, see <a href="https://github.com/apache/druid/issues/15951"> Issue#15951</a>.
* </p>
*/
@Ignore
@Test @Test
public void testKillSecondHalfEternitySegment() public void testKillSecondHalfEternitySegment()
{ {
@ -644,6 +648,26 @@ public class KillUnusedSegmentsTest
validateLastKillStateAndReset(DS1, secondHalfEternity); validateLastKillStateAndReset(DS1, secondHalfEternity);
} }
@Test
public void testKillLargeIntervalSegments()
{
final Interval largeTimeRange1 = Intervals.of("1990-01-01T00Z/19940-01-01T00Z");
final Interval largeTimeRange2 = Intervals.of("-19940-01-01T00Z/1970-01-01T00Z");
createAndAddUnusedSegment(DS1, largeTimeRange1, VERSION, NOW.minusDays(60));
createAndAddUnusedSegment(DS1, largeTimeRange2, VERSION, NOW.minusDays(60));
initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(largeTimeRange2.getStart(), largeTimeRange1.getEnd()));
}
@Test @Test
public void testKillMultipleSegmentsInSameInterval() public void testKillMultipleSegmentsInSameInterval()
{ {
@ -671,7 +695,7 @@ public class KillUnusedSegmentsTest
Assert.assertThrows( Assert.assertThrows(
DruidException.class, DruidException.class,
() -> new KillUnusedSegments( () -> new KillUnusedSegments(
segmentsMetadataManager, sqlSegmentsMetadataManager,
overlordClient, overlordClient,
new TestDruidCoordinatorConfig.Builder() new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.standardSeconds(10)) .withCoordinatorIndexingPeriod(Duration.standardSeconds(10))
@ -697,7 +721,7 @@ public class KillUnusedSegmentsTest
Assert.assertThrows( Assert.assertThrows(
DruidException.class, DruidException.class,
() -> new KillUnusedSegments( () -> new KillUnusedSegments(
segmentsMetadataManager, sqlSegmentsMetadataManager,
overlordClient, overlordClient,
new TestDruidCoordinatorConfig.Builder() new TestDruidCoordinatorConfig.Builder()
.withCoordinatorKillMaxSegments(-5) .withCoordinatorKillMaxSegments(-5)
@ -745,12 +769,14 @@ public class KillUnusedSegmentsTest
) )
{ {
final DataSegment segment = createSegment(dataSource, interval, version); final DataSegment segment = createSegment(dataSource, interval, version);
final DataSegmentPlus unusedSegmentPlus = new DataSegmentPlus( try {
segment, publisher.publishSegment(segment);
DateTimes.nowUtc(), }
lastUpdatedTime catch (IOException e) {
); throw new RuntimeException(e);
segmentsMetadataManager.addUnusedSegment(unusedSegmentPlus); }
sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(segment.getId()));
updateUsedStatusLastUpdated(segment, lastUpdatedTime);
} }
private DataSegment createSegment(final String dataSource, final Interval interval, final String version) private DataSegment createSegment(final String dataSource, final Interval interval, final String version)
@ -771,7 +797,7 @@ public class KillUnusedSegmentsTest
private void initDuty() private void initDuty()
{ {
killDuty = new KillUnusedSegments(segmentsMetadataManager, overlordClient, configBuilder.build()); killDuty = new KillUnusedSegments(sqlSegmentsMetadataManager, overlordClient, configBuilder.build());
} }
private CoordinatorRunStats runDutyAndGetStats() private CoordinatorRunStats runDutyAndGetStats()
@ -898,4 +924,25 @@ public class KillUnusedSegmentsTest
observedDatasourceToLastKillTaskId.remove(dataSource); observedDatasourceToLastKillTaskId.remove(dataSource);
} }
} }
private void updateUsedStatusLastUpdated(DataSegment segment, DateTime lastUpdatedTime)
{
derbyConnectorRule.getConnector().retryWithHandle(
handle -> handle.update(
StringUtils.format(
"UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?", getSegmentsTable()
),
lastUpdatedTime.toString(),
segment.getId().toString()
)
);
}
private String getSegmentsTable()
{
return derbyConnectorRule.metadataTablesConfigSupplier()
.get()
.getSegmentsTable()
.toUpperCase(Locale.ENGLISH);
}
} }

View File

@ -23,7 +23,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SortOrder; import org.apache.druid.metadata.SortOrder;
import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.server.http.DataSegmentPlus;
@ -35,9 +34,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -48,7 +45,6 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
{ {
private final ConcurrentMap<String, DataSegment> allSegments = new ConcurrentHashMap<>(); private final ConcurrentMap<String, DataSegment> allSegments = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DataSegment> usedSegments = new ConcurrentHashMap<>(); private final ConcurrentMap<String, DataSegment> usedSegments = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DataSegmentPlus> unusedSegments = new ConcurrentHashMap<>();
private volatile DataSourcesSnapshot snapshot; private volatile DataSourcesSnapshot snapshot;
@ -66,13 +62,6 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
snapshot = null; snapshot = null;
} }
public void addUnusedSegment(DataSegmentPlus segment)
{
unusedSegments.put(segment.getDataSegment().getId().toString(), segment);
allSegments.put(segment.getDataSegment().getId().toString(), segment.getDataSegment());
snapshot = null;
}
@Override @Override
public void startPollingDatabasePeriodically() public void startPollingDatabasePeriodically()
{ {
@ -136,12 +125,9 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
public int markSegmentsAsUnused(Set<SegmentId> segmentIds) public int markSegmentsAsUnused(Set<SegmentId> segmentIds)
{ {
int numModifiedSegments = 0; int numModifiedSegments = 0;
final DateTime now = DateTimes.nowUtc();
for (SegmentId segmentId : segmentIds) { for (SegmentId segmentId : segmentIds) {
if (allSegments.containsKey(segmentId.toString())) { if (allSegments.containsKey(segmentId.toString())) {
DataSegment dataSegment = allSegments.get(segmentId.toString());
unusedSegments.put(segmentId.toString(), new DataSegmentPlus(dataSegment, now, now));
usedSegments.remove(segmentId.toString()); usedSegments.remove(segmentId.toString());
++numModifiedSegments; ++numModifiedSegments;
} }
@ -238,28 +224,7 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
final DateTime maxUsedStatusLastUpdatedTime final DateTime maxUsedStatusLastUpdatedTime
) )
{ {
final List<DataSegmentPlus> sortedUnusedSegmentPluses = new ArrayList<>(unusedSegments.values()); return null;
sortedUnusedSegmentPluses.sort(
Comparator.comparingLong(
dataSegmentPlus -> dataSegmentPlus.getDataSegment().getInterval().getStartMillis()
)
);
final List<Interval> unusedSegmentIntervals = new ArrayList<>();
for (final DataSegmentPlus unusedSegmentPlus : sortedUnusedSegmentPluses) {
final DataSegment unusedSegment = unusedSegmentPlus.getDataSegment();
if (dataSource.equals(unusedSegment.getDataSource())) {
final Interval interval = unusedSegment.getInterval();
if ((minStartTime == null || interval.getStart().isAfter(minStartTime)) &&
interval.getEnd().isBefore(maxEndTime) &&
unusedSegmentPlus.getUsedStatusLastUpdatedDate().isBefore(maxUsedStatusLastUpdatedTime)) {
unusedSegmentIntervals.add(interval);
}
}
}
return unusedSegmentIntervals.stream().limit(limit).collect(Collectors.toList());
} }
@Override @Override