diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java index 1bd36d35d49..43b5bd3ddd1 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/PrelertPlugin.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcherSupplier; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -191,11 +192,26 @@ public class PrelertPlugin extends Plugin implements ActionPlugin { // norelease: we will no longer need to pass the client here after we switch to a client based data extractor new HttpDataExtractorFactory(client), System::currentTimeMillis); + + JobLifeCycleService jobLifeCycleService = + new JobLifeCycleService(settings, client, clusterService, dataProcessor, threadPool.generic()); + // we hop on the lifecycle service of ResourceWatcherService, because + // that one is stopped before discovery is. + // (when discovery is stopped it will send a leave request to elected master node, which will then be removed + // from the cluster state, which then triggers other events) + resourceWatcherService.addLifecycleListener(new LifecycleListener() { + + @Override + public void beforeStop() { + jobLifeCycleService.stop(); + } + }); + return Arrays.asList( jobProvider, jobManager, new JobAllocator(settings, clusterService, threadPool), - new JobLifeCycleService(settings, client, clusterService, dataProcessor, threadPool.generic()), + jobLifeCycleService, new JobDataDeleterFactory(client), //NORELEASE: this should use Delete-by-query dataProcessor, new PrelertInitializationService(settings, threadPool, clusterService, jobProvider), diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java index 437f17f5191..ff785154d3b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleService.java @@ -24,11 +24,13 @@ import java.util.concurrent.Executor; public class JobLifeCycleService extends AbstractComponent implements ClusterStateListener { - volatile Set localAssignedJobs = new HashSet<>(); private final Client client; private final DataProcessor dataProcessor; private final Executor executor; + volatile boolean stopped = false; + volatile Set localAssignedJobs = new HashSet<>(); + public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, DataProcessor dataProcessor, Executor executor) { super(settings); @@ -46,6 +48,11 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta return; } + if (stopped) { + logger.debug("no cluster state changes will be processed as the node has been stopped"); + return; + } + // Single volatile read: Set localAssignedJobs = this.localAssignedJobs; @@ -106,6 +113,20 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta localAssignedJobs = newSet; } + public void stop() { + stopped = true; + Set jobsToStop = this.localAssignedJobs; + for (String jobId : jobsToStop) { + try { + dataProcessor.closeJob(jobId); + } catch (Exception e) { + // in case of failure log it and continue with closing next job. + logger.error("Failed to close job [" + jobId + "] while stopping node", e); + } + + } + } + private void updateJobStatus(String jobId, JobStatus status, String reason) { UpdateJobStatusAction.Request request = new UpdateJobStatusAction.Request(jobId, status); request.setReason(reason); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java index a20e6722d67..0c9a9e8ea77 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/JobLifeCycleServiceTests.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; public class JobLifeCycleServiceTests extends ESTestCase { @@ -169,4 +170,30 @@ public class JobLifeCycleServiceTests extends ESTestCase { expectedRequest.setReason("failed to close, error"); verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any()); } + + public void testStop() { + jobLifeCycleService.localAssignedJobs.add("job1"); + jobLifeCycleService.localAssignedJobs.add("job2"); + assertFalse(jobLifeCycleService.stopped); + + jobLifeCycleService.stop(); + assertTrue(jobLifeCycleService.stopped); + verify(dataProcessor, times(1)).closeJob("job1"); + verify(dataProcessor, times(1)).closeJob("job2"); + verifyNoMoreInteractions(dataProcessor); + } + + public void testStop_failure() { + jobLifeCycleService.localAssignedJobs.add("job1"); + jobLifeCycleService.localAssignedJobs.add("job2"); + assertFalse(jobLifeCycleService.stopped); + + doThrow(new RuntimeException()).when(dataProcessor).closeJob("job1"); + jobLifeCycleService.stop(); + assertTrue(jobLifeCycleService.stopped); + verify(dataProcessor, times(1)).closeJob("job1"); + verify(dataProcessor, times(1)).closeJob("job2"); + verifyNoMoreInteractions(dataProcessor); + } + }