From f5b5cb93ead1cb3bbd7a525445be2461c977c52c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 1 Jul 2022 16:29:22 +0530 Subject: [PATCH] Fix expiry timeout bug in LocalIntermediateDataManager (#12722) The expiry timeout is compared against the current time but the condition is reversed. This means that as soon as a supervisor task finishes, its partitions are cleaned up, irrespective of the specified `intermediaryPartitionTimeout` period. After these changes, the `intermediaryPartitionTimeout` will start getting honored. Changes * Fix the condition * Add tests to verify the new correct behaviour * Reduce the default expiry timeout from P1D to PT5M to retain current behaviour in case of default configs. --- .../shuffle/LocalIntermediaryDataManager.java | 9 +- ...ntermediaryDataManagerAutoCleanupTest.java | 126 ++++++++++++------ .../indexing/worker/config/WorkerConfig.java | 2 +- 3 files changed, 93 insertions(+), 44 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java index 045e8dfb04a..d629773bfca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java @@ -151,7 +151,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager supervisorTaskChecker.scheduleAtFixedRate( () -> { try { - deleteExpiredSuprevisorTaskPartitionsIfNotRunning(); + deleteExpiredSupervisorTaskPartitionsIfNotRunning(); } catch (InterruptedException e) { LOG.error(e, "Error while cleaning up partitions for expired supervisors"); @@ -236,14 +236,13 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger * the self-cleanup for when the cleanup request is missing. */ - private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException + private void deleteExpiredSupervisorTaskPartitionsIfNotRunning() throws InterruptedException { - final DateTime now = DateTimes.nowUtc(); final Set expiredSupervisorTasks = new HashSet<>(); for (Entry entry : supervisorTaskCheckTimes.entrySet()) { final String supervisorTaskId = entry.getKey(); final DateTime checkTime = entry.getValue(); - if (checkTime.isAfter(now)) { + if (checkTime.isBeforeNow()) { expiredSupervisorTasks.add(supervisorTaskId); } } @@ -318,7 +317,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager try (final Closer resourceCloser = closer) { FileUtils.mkdirp(taskTempDir); - // Tempary compressed file. Will be removed when taskTempDir is deleted. + // Temporary compressed file. Will be removed when taskTempDir is deleted. final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile); if (unzippedSizeBytes == 0) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java index dcb214484d6..6120a884420 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java @@ -37,7 +37,6 @@ import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.ShardSpecLookup; import org.joda.time.Interval; import org.joda.time.Period; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -57,33 +56,13 @@ public class LocalIntermediaryDataManagerAutoCleanupTest @Rule public TemporaryFolder tempDir = new TemporaryFolder(); - private LocalIntermediaryDataManager intermediaryDataManager; + private TaskConfig taskConfig; + private IndexingServiceClient indexingServiceClient; @Before public void setup() throws IOException { - final WorkerConfig workerConfig = new WorkerConfig() - { - @Override - public long getIntermediaryPartitionDiscoveryPeriodSec() - { - return 1; - } - - @Override - public long getIntermediaryPartitionCleanupPeriodSec() - { - return 2; - } - - @Override - public Period getIntermediaryPartitionTimeout() - { - return new Period("PT2S"); - } - - }; - final TaskConfig taskConfig = new TaskConfig( + this.taskConfig = new TaskConfig( null, null, null, @@ -98,40 +77,79 @@ public class LocalIntermediaryDataManagerAutoCleanupTest TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null ); - final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() + this.indexingServiceClient = new NoopIndexingServiceClient() { @Override public Map getTaskStatuses(Set taskIds) { final Map result = new HashMap<>(); for (String taskId : taskIds) { - result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10)); + TaskState state = taskId.startsWith("running_") ? TaskState.RUNNING : TaskState.SUCCESS; + result.put(taskId, new TaskStatus(taskId, state, 10)); } return result; } }; - intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); - intermediaryDataManager.start(); - } - - @After - public void teardown() - { - intermediaryDataManager.stop(); } @Test - public void testCleanup() throws IOException, InterruptedException + public void testCompletedExpiredSupervisor() throws IOException, InterruptedException + { + Assert.assertTrue( + isCleanedUpAfter2s("supervisor_1", new Period("PT1S")) + ); + } + + @Test + public void testCompletedNotExpiredSupervisor() throws IOException, InterruptedException + { + Assert.assertFalse( + isCleanedUpAfter2s("supervisor_2", new Period("PT10S")) + ); + } + + @Test + public void testRunningSupervisor() throws IOException, InterruptedException + { + Assert.assertFalse( + isCleanedUpAfter2s("running_supervisor_1", new Period("PT1S")) + ); + } + + /** + * Creates a LocalIntermediaryDataManager and adds a segment to it. + * Also checks the cleanup status after 2s. + * + * @return true if the cleanup has happened after 2s, false otherwise. + */ + private boolean isCleanedUpAfter2s(String supervisorTaskId, Period timeoutPeriod) + throws IOException, InterruptedException { - final String supervisorTaskId = "supervisorTaskId"; final String subTaskId = "subTaskId"; final Interval interval = Intervals.of("2018/2019"); final File segmentFile = generateSegmentDir("test"); final DataSegment segment = newSegment(interval); + + // Setup data manager with expiry timeout 1s + WorkerConfig workerConfig = new TestWorkerConfig(1, 1, timeoutPeriod); + LocalIntermediaryDataManager intermediaryDataManager = + new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, segmentFile); - Thread.sleep(3000); - Assert.assertFalse(intermediaryDataManager.findPartitionFile(supervisorTaskId, subTaskId, interval, 0).isPresent()); + intermediaryDataManager + .findPartitionFile(supervisorTaskId, subTaskId, interval, 0); + + // Start the data manager and the cleanup cycle + intermediaryDataManager.start(); + + // Check the state of the partition after 2s + Thread.sleep(2000); + boolean partitionFileExists = intermediaryDataManager + .findPartitionFile(supervisorTaskId, subTaskId, interval, 0) + .isPresent(); + + intermediaryDataManager.stop(); + return !partitionFileExists; } private File generateSegmentDir(String fileName) throws IOException @@ -178,4 +196,36 @@ public class LocalIntermediaryDataManagerAutoCleanupTest throw new UnsupportedOperationException(); } } + + private static class TestWorkerConfig extends WorkerConfig + { + private final long cleanupPeriodSeconds; + private final long discoveryPeriodSeconds; + private final Period timeoutPeriod; + + private TestWorkerConfig(long cleanupPeriodSeconds, long discoveryPeriodSeconds, Period timeoutPeriod) + { + this.cleanupPeriodSeconds = cleanupPeriodSeconds; + this.discoveryPeriodSeconds = discoveryPeriodSeconds; + this.timeoutPeriod = timeoutPeriod; + } + + @Override + public long getIntermediaryPartitionCleanupPeriodSec() + { + return cleanupPeriodSeconds; + } + + @Override + public long getIntermediaryPartitionDiscoveryPeriodSec() + { + return discoveryPeriodSeconds; + } + + @Override + public Period getIntermediaryPartitionTimeout() + { + return timeoutPeriod; + } + } } diff --git a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java index 607cf51e6c4..d8a3701e940 100644 --- a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java +++ b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java @@ -53,7 +53,7 @@ public class WorkerConfig private long intermediaryPartitionCleanupPeriodSec = 300L; @JsonProperty - private Period intermediaryPartitionTimeout = new Period("P1D"); + private Period intermediaryPartitionTimeout = new Period("PT5M"); @JsonProperty private final long globalIngestionHeapLimitBytes = Runtime.getRuntime().maxMemory() / 6;