diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index 41025507b6f..38f70408232 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -176,7 +176,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn { task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); TaskStatus taskStatus = getIndexingServiceClient().runAndWait(task); - Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode()); + Assert.assertEquals("Actual task status: " + taskStatus, expectedTaskStatus, taskStatus.getStatusCode()); } Set runTask(Task task, TaskState expectedTaskStatus) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 40094e3445c..ecb00917a9b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -407,13 +407,14 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase private TaskStatus runAndWait(Task task) { try { - return runTask(task).get(); + // 20 minutes should be enough for the tasks to finish. + return runTask(task).get(20, TimeUnit.MINUTES); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } - catch (ExecutionException e) { + catch (ExecutionException | TimeoutException e) { throw new RuntimeException(e); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java index 41508cb2d6b..79b0e4a011b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java @@ -98,7 +98,7 @@ public class LocalIntermediaryDataManagerAutoCleanupTest public void testCompletedExpiredSupervisor() throws IOException, InterruptedException { Assert.assertTrue( - isCleanedUpAfter2s("supervisor_1", new Period("PT1S")) + isCleanedUpAfter3s("supervisor_1", new Period("PT1S")) ); } @@ -106,7 +106,7 @@ public class LocalIntermediaryDataManagerAutoCleanupTest public void testCompletedNotExpiredSupervisor() throws IOException, InterruptedException { Assert.assertFalse( - isCleanedUpAfter2s("supervisor_2", new Period("PT10S")) + isCleanedUpAfter3s("supervisor_2", new Period("PT10S")) ); } @@ -114,17 +114,19 @@ public class LocalIntermediaryDataManagerAutoCleanupTest public void testRunningSupervisor() throws IOException, InterruptedException { Assert.assertFalse( - isCleanedUpAfter2s("running_supervisor_1", new Period("PT1S")) + isCleanedUpAfter3s("running_supervisor_1", new Period("PT1S")) ); } /** * Creates a LocalIntermediaryDataManager and adds a segment to it. - * Also checks the cleanup status after 2s. + * Also checks the cleanup status after 3s. + * We use 3 seconds to avoid race condition between clean up in LocalIntermediaryDataManager + * and checking of status in test. * - * @return true if the cleanup has happened after 2s, false otherwise. + * @return true if the cleanup has happened after 3s, false otherwise. */ - private boolean isCleanedUpAfter2s(String supervisorTaskId, Period timeoutPeriod) + private boolean isCleanedUpAfter3s(String supervisorTaskId, Period timeoutPeriod) throws IOException, InterruptedException { final String subTaskId = "subTaskId"; @@ -132,7 +134,7 @@ public class LocalIntermediaryDataManagerAutoCleanupTest final File segmentFile = generateSegmentDir("test"); final DataSegment segment = newSegment(interval); - // Setup data manager with expiry timeout 1s + // Setup data manager with expiry timeout 1s and initial delay of 1 second WorkerConfig workerConfig = new TestWorkerConfig(1, 1, timeoutPeriod); LocalIntermediaryDataManager intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, overlordClient); @@ -144,8 +146,8 @@ public class LocalIntermediaryDataManagerAutoCleanupTest // Start the data manager and the cleanup cycle intermediaryDataManager.start(); - // Check the state of the partition after 2s - Thread.sleep(2000); + // Check the state of the partition after 3s + Thread.sleep(3000); boolean partitionFileExists = intermediaryDataManager .findPartitionFile(supervisorTaskId, subTaskId, interval, 0) .isPresent();