Fixed missed stopping of SchedulerEngine (#39193)
The SchedulerEngine is used in several places in our code and not all of these usages properly stopped the SchedulerEngine, which could lead to test failures due to leaked threads from the SchedulerEngine. This change adds stopping to these usages in order to avoid the thread leaks that cause CI failures and noise. Closes #38875
This commit is contained in:
parent
44df76251f
commit
697911c31d
|
@ -126,8 +126,12 @@ public class SchedulerEngine {
|
||||||
public void stop() {
|
public void stop() {
|
||||||
scheduler.shutdownNow();
|
scheduler.shutdownNow();
|
||||||
try {
|
try {
|
||||||
scheduler.awaitTermination(5, TimeUnit.SECONDS);
|
final boolean terminated = scheduler.awaitTermination(5L, TimeUnit.SECONDS);
|
||||||
|
if (terminated == false) {
|
||||||
|
logger.warn("scheduler engine was not terminated after waiting 5s");
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
logger.warn("interrupted while waiting for scheduler engine termination");
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,6 +100,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
|
||||||
private final Settings settings;
|
private final Settings settings;
|
||||||
private final boolean transportClientMode;
|
private final boolean transportClientMode;
|
||||||
private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
|
private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
|
||||||
|
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
|
||||||
|
|
||||||
public DataFrame(Settings settings) {
|
public DataFrame(Settings settings) {
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
|
@ -201,12 +202,12 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
|
||||||
return emptyList();
|
return emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC());
|
schedulerEngine.set(new SchedulerEngine(settings, Clock.systemUTC()));
|
||||||
|
|
||||||
// the transforms config manager should have been created
|
// the transforms config manager should have been created
|
||||||
assert dataFrameTransformsConfigManager.get() != null;
|
assert dataFrameTransformsConfigManager.get() != null;
|
||||||
return Collections.singletonList(
|
return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(),
|
||||||
new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), schedulerEngine, threadPool));
|
schedulerEngine.get(), threadPool));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -223,4 +224,11 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
|
||||||
DataFrameTransformState::fromXContent)
|
DataFrameTransformState::fromXContent)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (schedulerEngine.get() != null) {
|
||||||
|
schedulerEngine.get().stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,4 +68,4 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
|
||||||
return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(),
|
return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(),
|
||||||
(DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers);
|
(DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.cluster.LocalNodeMasterListener;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.component.Lifecycle.State;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
@ -53,8 +54,6 @@ public class IndexLifecycleService
|
||||||
private final PolicyStepsRegistry policyRegistry;
|
private final PolicyStepsRegistry policyRegistry;
|
||||||
private final IndexLifecycleRunner lifecycleRunner;
|
private final IndexLifecycleRunner lifecycleRunner;
|
||||||
private final Settings settings;
|
private final Settings settings;
|
||||||
private final ThreadPool threadPool;
|
|
||||||
private Client client;
|
|
||||||
private ClusterService clusterService;
|
private ClusterService clusterService;
|
||||||
private LongSupplier nowSupplier;
|
private LongSupplier nowSupplier;
|
||||||
private SchedulerEngine.Job scheduledJob;
|
private SchedulerEngine.Job scheduledJob;
|
||||||
|
@ -63,13 +62,11 @@ public class IndexLifecycleService
|
||||||
LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) {
|
LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) {
|
||||||
super();
|
super();
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.client = client;
|
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
this.clock = clock;
|
this.clock = clock;
|
||||||
this.nowSupplier = nowSupplier;
|
this.nowSupplier = nowSupplier;
|
||||||
this.scheduledJob = null;
|
this.scheduledJob = null;
|
||||||
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client);
|
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client);
|
||||||
this.threadPool = threadPool;
|
|
||||||
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier);
|
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier);
|
||||||
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
|
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
|
||||||
clusterService.addStateApplier(this);
|
clusterService.addStateApplier(this);
|
||||||
|
@ -158,14 +155,21 @@ public class IndexLifecycleService
|
||||||
return scheduledJob;
|
return scheduledJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void maybeScheduleJob() {
|
private synchronized void maybeScheduleJob() {
|
||||||
if (this.isMaster) {
|
if (this.isMaster) {
|
||||||
if (scheduler.get() == null) {
|
if (scheduler.get() == null) {
|
||||||
scheduler.set(new SchedulerEngine(settings, clock));
|
// don't create scheduler if the node is shutting down
|
||||||
scheduler.get().register(this);
|
if (isClusterServiceStoppedOrClosed() == false) {
|
||||||
|
scheduler.set(new SchedulerEngine(settings, clock));
|
||||||
|
scheduler.get().register(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// scheduler could be null if the node might be shutting down
|
||||||
|
if (scheduler.get() != null) {
|
||||||
|
scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval));
|
||||||
|
scheduler.get().add(scheduledJob);
|
||||||
}
|
}
|
||||||
scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval));
|
|
||||||
scheduler.get().add(scheduledJob);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,7 +258,11 @@ public class IndexLifecycleService
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public synchronized void close() {
|
||||||
|
// this assertion is here to ensure that the check we use in maybeScheduleJob is accurate for detecting a shutdown in
|
||||||
|
// progress, which is that the cluster service is stopped and closed at some point prior to closing plugins
|
||||||
|
assert isClusterServiceStoppedOrClosed() : "close is called by closing the plugin, which is expected to happen after " +
|
||||||
|
"the cluster service is stopped";
|
||||||
SchedulerEngine engine = scheduler.get();
|
SchedulerEngine engine = scheduler.get();
|
||||||
if (engine != null) {
|
if (engine != null) {
|
||||||
engine.stop();
|
engine.stop();
|
||||||
|
@ -265,4 +273,13 @@ public class IndexLifecycleService
|
||||||
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
|
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
|
||||||
new OperationModeUpdateTask(mode));
|
new OperationModeUpdateTask(mode));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that checks if the lifecycle state of the cluster service is stopped or closed. This
|
||||||
|
* enhances the readability of the code.
|
||||||
|
*/
|
||||||
|
private boolean isClusterServiceStoppedOrClosed() {
|
||||||
|
final State state = clusterService.lifecycleState();
|
||||||
|
return state == State.STOPPED || state == State.CLOSED;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
|
import org.elasticsearch.common.component.Lifecycle.State;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
|
@ -91,6 +92,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
||||||
Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build();
|
Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build();
|
||||||
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings,
|
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings,
|
||||||
Collections.singleton(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING)));
|
Collections.singleton(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING)));
|
||||||
|
when(clusterService.lifecycleState()).thenReturn(State.STARTED);
|
||||||
|
|
||||||
Client client = mock(Client.class);
|
Client client = mock(Client.class);
|
||||||
AdminClient adminClient = mock(AdminClient.class);
|
AdminClient adminClient = mock(AdminClient.class);
|
||||||
|
@ -108,6 +110,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void cleanup() {
|
public void cleanup() {
|
||||||
|
when(clusterService.lifecycleState()).thenReturn(randomFrom(State.STOPPED, State.CLOSED));
|
||||||
indexLifecycleService.close();
|
indexLifecycleService.close();
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.rollup;
|
package org.elasticsearch.xpack.rollup;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
|
@ -104,6 +105,7 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
|
||||||
new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"));
|
new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"));
|
||||||
|
|
||||||
|
|
||||||
|
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
|
||||||
private final Settings settings;
|
private final Settings settings;
|
||||||
private final boolean enabled;
|
private final boolean enabled;
|
||||||
private final boolean transportClientMode;
|
private final boolean transportClientMode;
|
||||||
|
@ -195,12 +197,19 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
|
||||||
return emptyList();
|
return emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock());
|
schedulerEngine.set(new SchedulerEngine(settings, getClock()));
|
||||||
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine, threadPool));
|
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine.get(), threadPool));
|
||||||
}
|
}
|
||||||
|
|
||||||
// overridable by tests
|
// overridable by tests
|
||||||
protected Clock getClock() {
|
protected Clock getClock() {
|
||||||
return Clock.systemUTC();
|
return Clock.systemUTC();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (schedulerEngine.get() != null) {
|
||||||
|
schedulerEngine.get().stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue