Fix shutdown in httpRemote task runner (#13558)

* Fix shutdown in httpRemote task runner

* Add UT
This commit is contained in:
AmatyaAvadhanula 2022-12-22 14:50:04 +05:30 committed by GitHub
parent 0d97e658b2
commit af05cfa78c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 210 additions and 5 deletions

View File

@ -1310,13 +1310,15 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
WorkerHolder workerHolderRunningTask = null;
synchronized (statusLock) {
log.info("Shutdown [%s] because: [%s]", taskId, reason);
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.remove(taskId);
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
if (taskRunnerWorkItem != null) {
if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
workerHolderRunningTask = workers.get(taskRunnerWorkItem.getWorker().getHost());
if (workerHolderRunningTask == null) {
log.info("Can't shutdown! No worker running task[%s]", taskId);
}
} else if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.COMPLETE) {
tasks.remove(taskId);
}
} else {
log.info("Received shutdown task[%s], but can't find it. Ignored.", taskId);
@ -1466,7 +1468,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return Optional.fromNullable(provisioningService.getStats());
}
void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder workerHolder)
@VisibleForTesting
public void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder workerHolder)
{
final String taskId = announcement.getTaskId();
final Worker worker = workerHolder.getWorker();

View File

@ -24,6 +24,11 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
@ -36,19 +41,34 @@ import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerTest;
import org.apache.druid.indexing.overlord.hrtr.WorkerHolder;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -60,6 +80,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class TaskQueueTest extends IngestionTestBase
{
@ -350,6 +372,124 @@ public class TaskQueueTest extends IngestionTestBase
);
}
@Test
public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws EntryExistsException, InterruptedException
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
final HttpRemoteTaskRunner taskRunner = createHttpRemoteTaskRunner(ImmutableList.of("t1"));
final StubServiceEmitter metricsVerifier = new StubServiceEmitter("druid/overlord", "testHost");
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)).anyTimes();
workerHolder.incrementContinuouslyFailedTasksCount();
EasyMock.expectLastCall();
workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(1);
EasyMock.replay(workerHolder);
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,
actionClientFactory,
getLockbox(),
metricsVerifier
);
taskQueue.setActive(true);
final Task task = new TestTask(
"t1",
Intervals.of("2021-01-01/P1D"),
ImmutableMap.of(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
false
)
);
taskQueue.add(task);
taskQueue.manageInternal();
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
task,
TaskStatus.running(task.getId()),
TaskLocation.create("worker", 1, 2)
), workerHolder);
while (!taskRunner.getRunningTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toList())
.contains(task.getId())) {
Thread.sleep(100);
}
taskQueue.shutdown(task.getId(), "shutdown");
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
task,
TaskStatus.failure(task.getId(), "shutdown"),
TaskLocation.create("worker", 1, 2)
), workerHolder);
taskQueue.manageInternal();
metricsVerifier.getEvents();
metricsVerifier.verifyEmitted("task/run/time", 1);
}
private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String> runningTasks)
{
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class);
for (String taskId : runningTasks) {
EasyMock.expect(taskStorageMock.getStatus(taskId)).andReturn(Optional.of(TaskStatus.running(taskId)));
}
EasyMock.replay(taskStorageMock);
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
return 3;
}
},
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
new NoopProvisioningStrategy<>(),
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new StubServiceEmitter("druid/overlord", "testHost")
);
taskRunner.start();
taskRunner.registerListener(
new TaskRunnerListener()
{
@Override
public String getListenerId()
{
return "test-listener";
}
@Override
public void locationChanged(String taskId, TaskLocation newLocation)
{
// do nothing
}
@Override
public void statusChanged(String taskId, TaskStatus status)
{
// do nothing
}
},
Execs.directExecutor()
);
return taskRunner;
}
private static class TestTask extends AbstractBatchIndexTask
{
private final Interval interval;

View File

@ -80,6 +80,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.easymock.EasyMock.isA;
@ -1606,7 +1607,68 @@ public class HttpRemoteTaskRunnerTest
);
}
private HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
/**
* Validate the internal state of tasks within the task runner
* when shutdown is called on pending / running tasks and completed tasks
*/
@Test
public void testShutdown()
{
List<Object> listenerNotificationsAccumulator = new ArrayList<>();
HttpRemoteTaskRunner taskRunner = createTaskRunnerForTestTaskAddedOrUpdated(
EasyMock.createStrictMock(TaskStorage.class),
listenerNotificationsAccumulator
);
Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY);
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
workerHolder.resetContinuouslyFailedTasksCount();
EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0);
EasyMock.replay(workerHolder);
taskRunner.start();
Task pendingTask = NoopTask.create("pendingTask");
taskRunner.run(pendingTask);
// Pending task is not cleaned up immediately
taskRunner.shutdown(pendingTask.getId(), "Forced shutdown");
Assert.assertTrue(taskRunner.getKnownTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toSet())
.contains(pendingTask.getId())
);
Task completedTask = NoopTask.create("completedTask");
taskRunner.run(completedTask);
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
completedTask,
TaskStatus.success(completedTask.getId()),
TaskLocation.create("worker", 1, 2)
), workerHolder);
Assert.assertEquals(completedTask.getId(), Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId());
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
// Completed tasks are cleaned up when shutdown is invokded on them (by TaskQueue)
taskRunner.shutdown(completedTask.getId(), "Cleanup");
Assert.assertFalse(taskRunner.getKnownTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toSet())
.contains(completedTask.getId())
);
}
public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
TaskStorage taskStorage,
List<Object> listenerNotificationsAccumulator
)
@ -1837,7 +1899,7 @@ public class HttpRemoteTaskRunnerTest
};
}
private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
public static class TestDruidNodeDiscovery implements DruidNodeDiscovery
{
private List<Listener> listeners;
@ -1872,7 +1934,7 @@ public class HttpRemoteTaskRunnerTest
}
}
private interface CustomFunction
public interface CustomFunction
{
WorkerHolder apply(
ObjectMapper smileMapper,