[ML] Refactor so ML doesn't require PersistentTasksService at startup (elastic/x-pack-elasticsearch#3682)

At present the PersistentTasksService is created inside the ML plugin.
This is undesirable, as other plugins will use persistent tasks in the
near future.

This change refactors the startup code so that the PersistentTasksService
no longer needs to be passed to any constructors for ML components.

A future change will still be required to actually move the initialization
of the PersistentTasksClusterService, PersistentTasksService and
PersistentTasksExecutorRegistry out of the ML plugin, but following this
change it should be fairly simple.

Original commit: elastic/x-pack-elasticsearch@3c2a8e020e
This commit is contained in:
David Roberts 2018-01-23 16:55:08 +00:00 committed by GitHub
parent 223d3c1f4c
commit 697a08e742
4 changed files with 30 additions and 27 deletions

View File

@ -11,6 +11,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskCancelledException;
@ -19,6 +20,7 @@ import org.elasticsearch.tasks.TaskManager;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
/** /**
* Represents a executor node operation that corresponds to a persistent task * Represents a executor node operation that corresponds to a persistent task
@ -105,6 +107,15 @@ public class AllocatedPersistentTask extends CancellableTask {
COMPLETED // the task is done running and trying to notify caller COMPLETED // the task is done running and trying to notify caller
} }
/**
* Waits for this persistent task to have the desired state.
*/
public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
@Nullable TimeValue timeout,
PersistentTasksService.WaitForPersistentTaskStatusListener<?> listener) {
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
}
public void markAsCompleted() { public void markAsCompleted() {
completeAndNotifyIfNeeded(null); completeAndNotifyIfNeeded(null);
} }

View File

@ -360,8 +360,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client); PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);
components.addAll(createComponents(client, clusterService, threadPool, xContentRegistry, environment, resourceWatcherService, components.addAll(createComponents(client, clusterService, threadPool, xContentRegistry, environment));
persistentTasksService));
// This was lifted from the XPackPlugins createComponents when it got split // This was lifted from the XPackPlugins createComponents when it got split
// This is not extensible and anyone copying this code needs to instead make this work // This is not extensible and anyone copying this code needs to instead make this work
@ -379,10 +378,11 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
return components; return components;
} }
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, // TODO: once initialization of the PersistentTasksClusterService, PersistentTasksService
NamedXContentRegistry xContentRegistry, Environment environment, // and PersistentTasksExecutorRegistry has been moved somewhere else the entire contents of
ResourceWatcherService resourceWatcherService, // this method can replace the entire contents of the overridden createComponents() method
PersistentTasksService persistentTasksService) { private Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Environment environment) {
if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) {
return emptyList(); return emptyList();
} }
@ -426,12 +426,13 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
this.autodetectProcessManager.set(autodetectProcessManager); this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobProvider, auditor, System::currentTimeMillis); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobProvider, auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, persistentTasksService); System::currentTimeMillis, auditor);
this.datafeedManager.set(datafeedManager); this.datafeedManager.set(datafeedManager);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
autodetectProcessManager); autodetectProcessManager);
InvalidLicenseEnforcer invalidLicenseEnforcer =
new InvalidLicenseEnforcer(settings, getLicenseState(), threadPool, datafeedManager, autodetectProcessManager); // This object's constructor attaches to the license state, so there's no need to retain another reference to it
new InvalidLicenseEnforcer(settings, getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
return Arrays.asList( return Arrays.asList(
mlLifeCycleService, mlLifeCycleService,
@ -442,7 +443,6 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
jobDataCountsPersister, jobDataCountsPersister,
datafeedManager, datafeedManager,
auditor, auditor,
invalidLicenseEnforcer,
new MlAssignmentNotifier(settings, auditor, clusterService) new MlAssignmentNotifier(settings, auditor, clusterService)
); );
} }

View File

@ -32,7 +32,6 @@ import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
@ -55,7 +54,6 @@ public class DatafeedManager extends AbstractComponent {
private final Client client; private final Client client;
private final ClusterService clusterService; private final ClusterService clusterService;
private final PersistentTasksService persistentTasksService;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final Supplier<Long> currentTimeSupplier; private final Supplier<Long> currentTimeSupplier;
private final Auditor auditor; private final Auditor auditor;
@ -66,14 +64,13 @@ public class DatafeedManager extends AbstractComponent {
private volatile boolean isolated; private volatile boolean isolated;
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) { Supplier<Long> currentTimeSupplier, Auditor auditor) {
super(Settings.EMPTY); super(Settings.EMPTY);
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService); this.clusterService = Objects.requireNonNull(clusterService);
this.threadPool = threadPool; this.threadPool = threadPool;
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.auditor = Objects.requireNonNull(auditor); this.auditor = Objects.requireNonNull(auditor);
this.persistentTasksService = Objects.requireNonNull(persistentTasksService);
this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder); this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder);
clusterService.addListener(taskRunner); clusterService.addListener(taskRunner);
} }
@ -91,8 +88,7 @@ public class DatafeedManager extends AbstractComponent {
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap( ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> { datafeedJob -> {
Holder holder = new Holder(task.getPersistentTaskId(), task.getAllocationId(), datafeed, datafeedJob, Holder holder = new Holder(task, datafeed, datafeedJob, new ProblemTracker(auditor, job.getId()), taskHandler);
task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), taskHandler);
runningDatafeedsOnThisNode.put(task.getAllocationId(), holder); runningDatafeedsOnThisNode.put(task.getAllocationId(), holder);
task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() { task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
@Override @Override
@ -279,7 +275,7 @@ public class DatafeedManager extends AbstractComponent {
public class Holder { public class Holder {
private final String taskId; private final TransportStartDatafeedAction.DatafeedTask task;
private final long allocationId; private final long allocationId;
private final DatafeedConfig datafeed; private final DatafeedConfig datafeed;
// To ensure that we wait until loopback / realtime search has completed before we stop the datafeed // To ensure that we wait until loopback / realtime search has completed before we stop the datafeed
@ -291,13 +287,13 @@ public class DatafeedManager extends AbstractComponent {
volatile Future<?> future; volatile Future<?> future;
private volatile boolean isRelocating; private volatile boolean isRelocating;
Holder(String taskId, long allocationId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, Holder(TransportStartDatafeedAction.DatafeedTask task, DatafeedConfig datafeed, DatafeedJob datafeedJob,
ProblemTracker problemTracker, Consumer<Exception> handler) { ProblemTracker problemTracker, Consumer<Exception> handler) {
this.taskId = taskId; this.task = task;
this.allocationId = allocationId; this.allocationId = task.getAllocationId();
this.datafeed = datafeed; this.datafeed = datafeed;
this.datafeedJob = datafeedJob; this.datafeedJob = datafeedJob;
this.autoCloseJob = autoCloseJob; this.autoCloseJob = task.isLookbackOnly();
this.problemTracker = problemTracker; this.problemTracker = problemTracker;
this.handler = handler; this.handler = handler;
} }
@ -397,7 +393,7 @@ public class DatafeedManager extends AbstractComponent {
return; return;
} }
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(20), task.waitForPersistentTaskStatus(Objects::isNull, TimeValue.timeValueSeconds(20),
new WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>() { new WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>() {
@Override @Override
public void onResponse(PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) { public void onResponse(PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {

View File

@ -41,7 +41,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.notifications.AuditorField;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -122,8 +121,6 @@ public class DatafeedManagerTests extends ESTestCase {
when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService); when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
datafeedJob = mock(DatafeedJob.class); datafeedJob = mock(DatafeedJob.class);
when(datafeedJob.isRunning()).thenReturn(true); when(datafeedJob.isRunning()).thenReturn(true);
when(datafeedJob.stop()).thenReturn(true); when(datafeedJob.stop()).thenReturn(true);
@ -135,8 +132,7 @@ public class DatafeedManagerTests extends ESTestCase {
return null; return null;
}).when(datafeedJobBuilder).build(any(), any(), any()); }).when(datafeedJobBuilder).build(any(), any(), any());
datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor, datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor);
persistentTasksService);
verify(clusterService).addListener(capturedClusterStateListener.capture()); verify(clusterService).addListener(capturedClusterStateListener.capture());
} }