mirror of https://github.com/apache/druid.git
Fix sporadic fail of RemoteTaskRunnerTest#testWorkerRemoved
This commit is contained in:
parent
e38b7554e4
commit
8a179fc273
|
@ -20,6 +20,7 @@
|
||||||
package io.druid.indexing.overlord;
|
package io.druid.indexing.overlord;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
|
@ -1062,8 +1063,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
return ImmutableList.copyOf(lazyWorkers.values());
|
return ImmutableList.copyOf(lazyWorkers.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
ConcurrentMap<String, ScheduledFuture> getRemovedWorkerCleanups()
|
ConcurrentMap<String, ScheduledFuture> getRemovedWorkerCleanups()
|
||||||
{
|
{
|
||||||
return removedWorkerCleanups;
|
return removedWorkerCleanups;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
RemoteTaskRunnerConfig getRemoteTaskRunnerConfig()
|
||||||
|
{
|
||||||
|
return config;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,13 +95,18 @@ public class TestUtils
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean conditionValid(IndexingServiceCondition condition)
|
public static boolean conditionValid(IndexingServiceCondition condition)
|
||||||
|
{
|
||||||
|
return conditionValid(condition, 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean conditionValid(IndexingServiceCondition condition, long timeout)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
Stopwatch stopwatch = Stopwatch.createUnstarted();
|
Stopwatch stopwatch = Stopwatch.createUnstarted();
|
||||||
stopwatch.start();
|
stopwatch.start();
|
||||||
while (!condition.isValid()) {
|
while (!condition.isValid()) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
|
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > timeout) {
|
||||||
throw new ISE("Cannot find running task");
|
throw new ISE("Cannot find running task");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -408,6 +408,7 @@ public class RemoteTaskRunnerTest
|
||||||
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||||
|
|
||||||
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
|
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
|
||||||
|
RemoteTaskRunnerConfig config = remoteTaskRunner.getRemoteTaskRunnerConfig();
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(
|
||||||
TestUtils.conditionValid(
|
TestUtils.conditionValid(
|
||||||
new IndexingServiceCondition()
|
new IndexingServiceCondition()
|
||||||
|
@ -417,7 +418,9 @@ public class RemoteTaskRunnerTest
|
||||||
{
|
{
|
||||||
return remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
|
return remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
// cleanup task is independently scheduled by event listener. we need to wait some more time.
|
||||||
|
config.getTaskCleanupTimeout().toStandardDuration().getMillis() * 2
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
Assert.assertNull(cf.checkExists().forPath(statusPath));
|
Assert.assertNull(cf.checkExists().forPath(statusPath));
|
||||||
|
|
Loading…
Reference in New Issue