Fix expiry timeout bug in LocalIntermediateDataManager (#12722)

The expiry timeout is compared against the current time but the condition is reversed.
This means that as soon as a supervisor task finishes, its partitions are cleaned up,
irrespective of the specified `intermediaryPartitionTimeout` period.

After these changes, the `intermediaryPartitionTimeout` will start getting honored.

Changes
* Fix the condition
* Add tests to verify the new correct behaviour
* Reduce the default expiry timeout from P1D to PT5M
   to retain current behaviour in case of default configs.
This commit is contained in:
Kashif Faraz 2022-07-01 16:29:22 +05:30 committed by GitHub
parent 48731710fb
commit f5b5cb93ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 44 deletions

View File

@ -151,7 +151,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
supervisorTaskChecker.scheduleAtFixedRate( supervisorTaskChecker.scheduleAtFixedRate(
() -> { () -> {
try { try {
deleteExpiredSuprevisorTaskPartitionsIfNotRunning(); deleteExpiredSupervisorTaskPartitionsIfNotRunning();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
LOG.error(e, "Error while cleaning up partitions for expired supervisors"); LOG.error(e, "Error while cleaning up partitions for expired supervisors");
@ -236,14 +236,13 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
* Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger
* the self-cleanup for when the cleanup request is missing. * the self-cleanup for when the cleanup request is missing.
*/ */
private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException private void deleteExpiredSupervisorTaskPartitionsIfNotRunning() throws InterruptedException
{ {
final DateTime now = DateTimes.nowUtc();
final Set<String> expiredSupervisorTasks = new HashSet<>(); final Set<String> expiredSupervisorTasks = new HashSet<>();
for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) { for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
final String supervisorTaskId = entry.getKey(); final String supervisorTaskId = entry.getKey();
final DateTime checkTime = entry.getValue(); final DateTime checkTime = entry.getValue();
if (checkTime.isAfter(now)) { if (checkTime.isBeforeNow()) {
expiredSupervisorTasks.add(supervisorTaskId); expiredSupervisorTasks.add(supervisorTaskId);
} }
} }
@ -318,7 +317,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
try (final Closer resourceCloser = closer) { try (final Closer resourceCloser = closer) {
FileUtils.mkdirp(taskTempDir); FileUtils.mkdirp(taskTempDir);
// Tempary compressed file. Will be removed when taskTempDir is deleted. // Temporary compressed file. Will be removed when taskTempDir is deleted.
final File tempZippedFile = new File(taskTempDir, segment.getId().toString()); final File tempZippedFile = new File(taskTempDir, segment.getId().toString());
final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile); final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, tempZippedFile);
if (unzippedSizeBytes == 0) { if (unzippedSizeBytes == 0) {

View File

@ -37,7 +37,6 @@ import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.ShardSpecLookup; import org.apache.druid.timeline.partition.ShardSpecLookup;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -57,33 +56,13 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
@Rule @Rule
public TemporaryFolder tempDir = new TemporaryFolder(); public TemporaryFolder tempDir = new TemporaryFolder();
private LocalIntermediaryDataManager intermediaryDataManager; private TaskConfig taskConfig;
private IndexingServiceClient indexingServiceClient;
@Before @Before
public void setup() throws IOException public void setup() throws IOException
{ {
final WorkerConfig workerConfig = new WorkerConfig() this.taskConfig = new TaskConfig(
{
@Override
public long getIntermediaryPartitionDiscoveryPeriodSec()
{
return 1;
}
@Override
public long getIntermediaryPartitionCleanupPeriodSec()
{
return 2;
}
@Override
public Period getIntermediaryPartitionTimeout()
{
return new Period("PT2S");
}
};
final TaskConfig taskConfig = new TaskConfig(
null, null,
null, null,
null, null,
@ -98,40 +77,79 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null null
); );
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() this.indexingServiceClient = new NoopIndexingServiceClient()
{ {
@Override @Override
public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds) public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
{ {
final Map<String, TaskStatus> result = new HashMap<>(); final Map<String, TaskStatus> result = new HashMap<>();
for (String taskId : taskIds) { for (String taskId : taskIds) {
result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10)); TaskState state = taskId.startsWith("running_") ? TaskState.RUNNING : TaskState.SUCCESS;
result.put(taskId, new TaskStatus(taskId, state, 10));
} }
return result; return result;
} }
}; };
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
intermediaryDataManager.start();
}
@After
public void teardown()
{
intermediaryDataManager.stop();
} }
@Test @Test
public void testCleanup() throws IOException, InterruptedException public void testCompletedExpiredSupervisor() throws IOException, InterruptedException
{
Assert.assertTrue(
isCleanedUpAfter2s("supervisor_1", new Period("PT1S"))
);
}
@Test
public void testCompletedNotExpiredSupervisor() throws IOException, InterruptedException
{
Assert.assertFalse(
isCleanedUpAfter2s("supervisor_2", new Period("PT10S"))
);
}
@Test
public void testRunningSupervisor() throws IOException, InterruptedException
{
Assert.assertFalse(
isCleanedUpAfter2s("running_supervisor_1", new Period("PT1S"))
);
}
/**
* Creates a LocalIntermediaryDataManager and adds a segment to it.
* Also checks the cleanup status after 2s.
*
* @return true if the cleanup has happened after 2s, false otherwise.
*/
private boolean isCleanedUpAfter2s(String supervisorTaskId, Period timeoutPeriod)
throws IOException, InterruptedException
{ {
final String supervisorTaskId = "supervisorTaskId";
final String subTaskId = "subTaskId"; final String subTaskId = "subTaskId";
final Interval interval = Intervals.of("2018/2019"); final Interval interval = Intervals.of("2018/2019");
final File segmentFile = generateSegmentDir("test"); final File segmentFile = generateSegmentDir("test");
final DataSegment segment = newSegment(interval); final DataSegment segment = newSegment(interval);
// Setup data manager with expiry timeout 1s
WorkerConfig workerConfig = new TestWorkerConfig(1, 1, timeoutPeriod);
LocalIntermediaryDataManager intermediaryDataManager =
new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, segmentFile); intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, segmentFile);
Thread.sleep(3000); intermediaryDataManager
Assert.assertFalse(intermediaryDataManager.findPartitionFile(supervisorTaskId, subTaskId, interval, 0).isPresent()); .findPartitionFile(supervisorTaskId, subTaskId, interval, 0);
// Start the data manager and the cleanup cycle
intermediaryDataManager.start();
// Check the state of the partition after 2s
Thread.sleep(2000);
boolean partitionFileExists = intermediaryDataManager
.findPartitionFile(supervisorTaskId, subTaskId, interval, 0)
.isPresent();
intermediaryDataManager.stop();
return !partitionFileExists;
} }
private File generateSegmentDir(String fileName) throws IOException private File generateSegmentDir(String fileName) throws IOException
@ -178,4 +196,36 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
} }
private static class TestWorkerConfig extends WorkerConfig
{
private final long cleanupPeriodSeconds;
private final long discoveryPeriodSeconds;
private final Period timeoutPeriod;
private TestWorkerConfig(long cleanupPeriodSeconds, long discoveryPeriodSeconds, Period timeoutPeriod)
{
this.cleanupPeriodSeconds = cleanupPeriodSeconds;
this.discoveryPeriodSeconds = discoveryPeriodSeconds;
this.timeoutPeriod = timeoutPeriod;
}
@Override
public long getIntermediaryPartitionCleanupPeriodSec()
{
return cleanupPeriodSeconds;
}
@Override
public long getIntermediaryPartitionDiscoveryPeriodSec()
{
return discoveryPeriodSeconds;
}
@Override
public Period getIntermediaryPartitionTimeout()
{
return timeoutPeriod;
}
}
} }

View File

@ -53,7 +53,7 @@ public class WorkerConfig
private long intermediaryPartitionCleanupPeriodSec = 300L; private long intermediaryPartitionCleanupPeriodSec = 300L;
@JsonProperty @JsonProperty
private Period intermediaryPartitionTimeout = new Period("P1D"); private Period intermediaryPartitionTimeout = new Period("PT5M");
@JsonProperty @JsonProperty
private final long globalIngestionHeapLimitBytes = Runtime.getRuntime().maxMemory() / 6; private final long globalIngestionHeapLimitBytes = Runtime.getRuntime().maxMemory() / 6;