mirror of https://github.com/apache/druid.git
Close provisioner during HttpRemotetaskRunner LifecycleStop (#12176)
Fixed an issue where the provisionerService which can be used to spawn resources as needed is left running on a non-leader coordinator/overlord, after it is removed from leadership. Provisioning should only be done by the leader. To fix the issue, a call to stop the provisionerService was added to the stop() method of HttpRemoteTaskRunner class. The provisionerService was properly closed on other TaskRunner types.
This commit is contained in:
parent
6ce14e6b17
commit
376d7c069d
|
@ -1342,6 +1342,9 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
|
||||
log.info("Stopping...");
|
||||
|
||||
if (provisioningService != null) {
|
||||
provisioningService.close();
|
||||
}
|
||||
pendingTasksExec.shutdownNow();
|
||||
workersSyncExec.shutdownNow();
|
||||
cleanupExec.shutdown();
|
||||
|
|
|
@ -42,6 +42,8 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
|
|||
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
|
||||
import org.apache.druid.indexing.overlord.TaskStorage;
|
||||
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
|
||||
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
|
||||
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
|
||||
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
|
||||
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
|
||||
import org.apache.druid.indexing.worker.TaskAnnouncement;
|
||||
|
@ -78,6 +80,8 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.easymock.EasyMock.isA;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -182,6 +186,145 @@ public class HttpRemoteTaskRunnerTest
|
|||
Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
|
||||
}
|
||||
|
||||
/*
|
||||
Simulates startup of Overlord. Overlord is then stopped and is expected to close down certain things.
|
||||
*/
|
||||
@Test(timeout = 60_000L)
|
||||
public void testFreshStartAndStop()
|
||||
{
|
||||
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
|
||||
ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
|
||||
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||
ProvisioningService provisioningService = EasyMock.createNiceMock(ProvisioningService.class);
|
||||
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
|
||||
.andReturn(druidNodeDiscovery);
|
||||
EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class)))
|
||||
.andReturn(provisioningService);
|
||||
provisioningService.close();
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService);
|
||||
|
||||
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
|
||||
TestHelper.makeJsonMapper(),
|
||||
new HttpRemoteTaskRunnerConfig()
|
||||
{
|
||||
@Override
|
||||
public int getPendingTasksRunnerNumThreads()
|
||||
{
|
||||
return 3;
|
||||
}
|
||||
},
|
||||
EasyMock.createNiceMock(HttpClient.class),
|
||||
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
|
||||
provisioningStrategy,
|
||||
druidNodeDiscoveryProvider,
|
||||
EasyMock.createNiceMock(TaskStorage.class),
|
||||
EasyMock.createNiceMock(CuratorFramework.class),
|
||||
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
|
||||
)
|
||||
{
|
||||
@Override
|
||||
protected WorkerHolder createWorkerHolder(
|
||||
ObjectMapper smileMapper,
|
||||
HttpClient httpClient,
|
||||
HttpRemoteTaskRunnerConfig config,
|
||||
ScheduledExecutorService workersSyncExec,
|
||||
WorkerHolder.Listener listener,
|
||||
Worker worker,
|
||||
List<TaskAnnouncement> knownAnnouncements
|
||||
)
|
||||
{
|
||||
return HttpRemoteTaskRunnerTest.createWorkerHolder(
|
||||
smileMapper,
|
||||
httpClient,
|
||||
config,
|
||||
workersSyncExec,
|
||||
listener,
|
||||
worker,
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of(),
|
||||
ImmutableMap.of(),
|
||||
new AtomicInteger(),
|
||||
ImmutableSet.of()
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
taskRunner.start();
|
||||
taskRunner.stop();
|
||||
EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService);
|
||||
}
|
||||
|
||||
/*
|
||||
Simulates startup of Overlord with no provisoner. Overlord is then stopped and is expected to close down certain
|
||||
things.
|
||||
*/
|
||||
@Test(timeout = 60_000L)
|
||||
public void testFreshStartAndStopNoProvisioner()
|
||||
{
|
||||
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
|
||||
ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
|
||||
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
|
||||
.andReturn(druidNodeDiscovery);
|
||||
EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class)))
|
||||
.andReturn(null);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy);
|
||||
|
||||
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
|
||||
TestHelper.makeJsonMapper(),
|
||||
new HttpRemoteTaskRunnerConfig()
|
||||
{
|
||||
@Override
|
||||
public int getPendingTasksRunnerNumThreads()
|
||||
{
|
||||
return 3;
|
||||
}
|
||||
},
|
||||
EasyMock.createNiceMock(HttpClient.class),
|
||||
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
|
||||
provisioningStrategy,
|
||||
druidNodeDiscoveryProvider,
|
||||
EasyMock.createNiceMock(TaskStorage.class),
|
||||
EasyMock.createNiceMock(CuratorFramework.class),
|
||||
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
|
||||
)
|
||||
{
|
||||
@Override
|
||||
protected WorkerHolder createWorkerHolder(
|
||||
ObjectMapper smileMapper,
|
||||
HttpClient httpClient,
|
||||
HttpRemoteTaskRunnerConfig config,
|
||||
ScheduledExecutorService workersSyncExec,
|
||||
WorkerHolder.Listener listener,
|
||||
Worker worker,
|
||||
List<TaskAnnouncement> knownAnnouncements
|
||||
)
|
||||
{
|
||||
return HttpRemoteTaskRunnerTest.createWorkerHolder(
|
||||
smileMapper,
|
||||
httpClient,
|
||||
config,
|
||||
workersSyncExec,
|
||||
listener,
|
||||
worker,
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of(),
|
||||
ImmutableMap.of(),
|
||||
new AtomicInteger(),
|
||||
ImmutableSet.of()
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
taskRunner.start();
|
||||
taskRunner.stop();
|
||||
EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy);
|
||||
}
|
||||
|
||||
/*
|
||||
Simulates one task not getting acknowledged to be running after assigning it to a worker. But, other tasks are
|
||||
successfully assigned to other worker and get completed.
|
||||
|
|
Loading…
Reference in New Issue