Stop local allocated jobs before closing node.

Original commit: elastic/x-pack-elasticsearch@ad3ed7c86a
This commit is contained in:
Martijn van Groningen 2016-12-16 12:26:00 +01:00
parent c43a9ba1dd
commit eaca8fb06a
3 changed files with 66 additions and 2 deletions

View File

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -191,11 +192,26 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor // norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client), new HttpDataExtractorFactory(client),
System::currentTimeMillis); System::currentTimeMillis);
JobLifeCycleService jobLifeCycleService =
new JobLifeCycleService(settings, client, clusterService, dataProcessor, threadPool.generic());
// we hop on the lifecycle service of ResourceWatcherService, because
// that one is stopped before discovery is.
// (when discovery is stopped it will send a leave request to elected master node, which will then be removed
// from the cluster state, which then triggers other events)
resourceWatcherService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
jobLifeCycleService.stop();
}
});
return Arrays.asList( return Arrays.asList(
jobProvider, jobProvider,
jobManager, jobManager,
new JobAllocator(settings, clusterService, threadPool), new JobAllocator(settings, clusterService, threadPool),
new JobLifeCycleService(settings, client, clusterService, dataProcessor, threadPool.generic()), jobLifeCycleService,
new JobDataDeleterFactory(client), //NORELEASE: this should use Delete-by-query new JobDataDeleterFactory(client), //NORELEASE: this should use Delete-by-query
dataProcessor, dataProcessor,
new PrelertInitializationService(settings, threadPool, clusterService, jobProvider), new PrelertInitializationService(settings, threadPool, clusterService, jobProvider),

View File

@ -24,11 +24,13 @@ import java.util.concurrent.Executor;
public class JobLifeCycleService extends AbstractComponent implements ClusterStateListener { public class JobLifeCycleService extends AbstractComponent implements ClusterStateListener {
volatile Set<String> localAssignedJobs = new HashSet<>();
private final Client client; private final Client client;
private final DataProcessor dataProcessor; private final DataProcessor dataProcessor;
private final Executor executor; private final Executor executor;
volatile boolean stopped = false;
volatile Set<String> localAssignedJobs = new HashSet<>();
public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, DataProcessor dataProcessor, public JobLifeCycleService(Settings settings, Client client, ClusterService clusterService, DataProcessor dataProcessor,
Executor executor) { Executor executor) {
super(settings); super(settings);
@ -46,6 +48,11 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
return; return;
} }
if (stopped) {
logger.debug("no cluster state changes will be processed as the node has been stopped");
return;
}
// Single volatile read: // Single volatile read:
Set<String> localAssignedJobs = this.localAssignedJobs; Set<String> localAssignedJobs = this.localAssignedJobs;
@ -106,6 +113,20 @@ public class JobLifeCycleService extends AbstractComponent implements ClusterSta
localAssignedJobs = newSet; localAssignedJobs = newSet;
} }
public void stop() {
stopped = true;
Set<String> jobsToStop = this.localAssignedJobs;
for (String jobId : jobsToStop) {
try {
dataProcessor.closeJob(jobId);
} catch (Exception e) {
// in case of failure log it and continue with closing next job.
logger.error("Failed to close job [" + jobId + "] while stopping node", e);
}
}
}
private void updateJobStatus(String jobId, JobStatus status, String reason) { private void updateJobStatus(String jobId, JobStatus status, String reason) {
UpdateJobStatusAction.Request request = new UpdateJobStatusAction.Request(jobId, status); UpdateJobStatusAction.Request request = new UpdateJobStatusAction.Request(jobId, status);
request.setReason(reason); request.setReason(reason);

View File

@ -29,6 +29,7 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class JobLifeCycleServiceTests extends ESTestCase { public class JobLifeCycleServiceTests extends ESTestCase {
@ -169,4 +170,30 @@ public class JobLifeCycleServiceTests extends ESTestCase {
expectedRequest.setReason("failed to close, error"); expectedRequest.setReason("failed to close, error");
verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any()); verify(client).execute(eq(UpdateJobStatusAction.INSTANCE), eq(expectedRequest), any());
} }
public void testStop() {
jobLifeCycleService.localAssignedJobs.add("job1");
jobLifeCycleService.localAssignedJobs.add("job2");
assertFalse(jobLifeCycleService.stopped);
jobLifeCycleService.stop();
assertTrue(jobLifeCycleService.stopped);
verify(dataProcessor, times(1)).closeJob("job1");
verify(dataProcessor, times(1)).closeJob("job2");
verifyNoMoreInteractions(dataProcessor);
}
public void testStop_failure() {
jobLifeCycleService.localAssignedJobs.add("job1");
jobLifeCycleService.localAssignedJobs.add("job2");
assertFalse(jobLifeCycleService.stopped);
doThrow(new RuntimeException()).when(dataProcessor).closeJob("job1");
jobLifeCycleService.stop();
assertTrue(jobLifeCycleService.stopped);
verify(dataProcessor, times(1)).closeJob("job1");
verify(dataProcessor, times(1)).closeJob("job2");
verifyNoMoreInteractions(dataProcessor);
}
} }