diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index c88878f358e..61218e07ff0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -20,6 +20,7 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -1062,8 +1063,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer return ImmutableList.copyOf(lazyWorkers.values()); } + @VisibleForTesting ConcurrentMap getRemovedWorkerCleanups() { return removedWorkerCleanups; } + + @VisibleForTesting + RemoteTaskRunnerConfig getRemoteTaskRunnerConfig() + { + return config; + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java index 027fc25d8f4..af05058104c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java @@ -95,13 +95,18 @@ public class TestUtils } public static boolean conditionValid(IndexingServiceCondition condition) + { + return conditionValid(condition, 1000); + } + + public static boolean conditionValid(IndexingServiceCondition condition, long timeout) { try { Stopwatch stopwatch = Stopwatch.createUnstarted(); stopwatch.start(); while (!condition.isValid()) { Thread.sleep(100); - if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > timeout) { throw new ISE("Cannot find running task"); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 576807fd97c..d37c1323b1c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -408,6 +408,7 @@ public class RemoteTaskRunnerTest TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode()); + RemoteTaskRunnerConfig config = remoteTaskRunner.getRemoteTaskRunnerConfig(); Assert.assertTrue( TestUtils.conditionValid( new IndexingServiceCondition() @@ -417,7 +418,9 @@ public class RemoteTaskRunnerTest { return remoteTaskRunner.getRemovedWorkerCleanups().isEmpty(); } - } + }, + // cleanup task is independently scheduled by event listener. we need to wait some more time. + config.getTaskCleanupTimeout().toStandardDuration().getMillis() * 2 ) ); Assert.assertNull(cf.checkExists().forPath(statusPath));