[ML] Don't assign cancel holders to node tasks, but use the services that stop datafeed and jobs directly.

Relates to elastic/x-pack-elasticsearch#749

Original commit: elastic/x-pack-elasticsearch@ff074b0442
This commit is contained in:
Martijn van Groningen 2017-03-21 15:29:40 +01:00
parent 6fae1867ec
commit cb5d44c9c4
5 changed files with 28 additions and 20 deletions

View File

@ -40,7 +40,7 @@ public class InvalidLicenseEnforcer extends AbstractComponent {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
datafeedJobRunner.closeAllDatafeeds("invalid license"); datafeedJobRunner.stopAllDatafeeds("invalid license");
autodetectProcessManager.closeAllJobs("invalid license"); autodetectProcessManager.closeAllJobs("invalid license");
} }
}); });

View File

@ -66,7 +66,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.function.BiConsumer;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
@ -221,7 +220,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
public static class JobTask extends NodePersistentTask { public static class JobTask extends NodePersistentTask {
private final String jobId; private final String jobId;
private volatile BiConsumer<Boolean, String> cancelHandler; private volatile AutodetectProcessManager autodetectProcessManager;
JobTask(String jobId, long id, String type, String action, TaskId parentTask) { JobTask(String jobId, long id, String type, String action, TaskId parentTask) {
super(id, type, action, "job-" + jobId, parentTask); super(id, type, action, "job-" + jobId, parentTask);
@ -236,7 +235,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
protected void onCancelled() { protected void onCancelled() {
String reason = getReasonCancelled(); String reason = getReasonCancelled();
boolean restart = CancelTasksRequest.DEFAULT_REASON.equals(reason) == false; boolean restart = CancelTasksRequest.DEFAULT_REASON.equals(reason) == false;
cancelHandler.accept(restart, reason); autodetectProcessManager.closeJob(jobId, restart, reason);
} }
static boolean match(Task task, String expectedJobId) { static boolean match(Task task, String expectedJobId) {
@ -327,14 +326,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
@Override @Override
protected void nodeOperation(NodePersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) { protected void nodeOperation(NodePersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager;
autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> { autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> {
if (e1 != null) { if (e1 != null) {
listener.onFailure(e1); listener.onFailure(e1);
return; return;
} }
JobTask jobTask = (JobTask) task;
jobTask.cancelHandler = (restart, reason) -> autodetectProcessManager.closeJob(request.getJobId(), restart, reason);
autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> { autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> {
if (e2 == null) { if (e2 == null) {
listener.onResponse(new TransportResponse.Empty()); listener.onResponse(new TransportResponse.Empty());

View File

@ -230,7 +230,7 @@ public class StartDatafeedAction
private final String datafeedId; private final String datafeedId;
private final long startTime; private final long startTime;
private final Long endTime; private final Long endTime;
private volatile DatafeedJobRunner.Holder holder; private volatile DatafeedJobRunner datafeedJobRunner;
public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) { public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) {
super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId); super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId);
@ -239,6 +239,15 @@ public class StartDatafeedAction
this.endTime = request.endTime; this.endTime = request.endTime;
} }
/* only for testing */
public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request, DatafeedJobRunner datafeedJobRunner) {
super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId);
this.datafeedId = request.getDatafeedId();
this.startTime = request.startTime;
this.endTime = request.endTime;
this.datafeedJobRunner = datafeedJobRunner;
}
public String getDatafeedId() { public String getDatafeedId() {
return datafeedId; return datafeedId;
} }
@ -256,10 +265,6 @@ public class StartDatafeedAction
return endTime != null; return endTime != null;
} }
public void setHolder(DatafeedJobRunner.Holder holder) {
this.holder = holder;
}
@Override @Override
protected void onCancelled() { protected void onCancelled() {
stop(getReasonCancelled()); stop(getReasonCancelled());
@ -270,10 +275,7 @@ public class StartDatafeedAction
} }
public void stop(String reason, TimeValue timeout) { public void stop(String reason, TimeValue timeout) {
if (holder == null) { datafeedJobRunner.stopDatafeed(datafeedId, reason, timeout);
throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder");
}
holder.stop(reason, timeout, null);
} }
} }
@ -339,6 +341,7 @@ public class StartDatafeedAction
protected void nodeOperation(NodePersistentTask nodePersistentTask, Request request, protected void nodeOperation(NodePersistentTask nodePersistentTask, Request request,
ActionListener<TransportResponse.Empty> listener) { ActionListener<TransportResponse.Empty> listener) {
DatafeedTask datafeedTask = (DatafeedTask) nodePersistentTask; DatafeedTask datafeedTask = (DatafeedTask) nodePersistentTask;
datafeedTask.datafeedJobRunner = datafeedJobRunner;
datafeedJobRunner.run(datafeedTask, datafeedJobRunner.run(datafeedTask,
(error) -> { (error) -> {
if (error != null) { if (error != null) {

View File

@ -111,7 +111,14 @@ public class DatafeedJobRunner extends AbstractComponent {
}, handler); }, handler);
} }
public synchronized void closeAllDatafeeds(String reason) { public synchronized void stopDatafeed(String datafeedId, String reason, TimeValue timeout) {
Holder holder = runningDatafeeds.remove(datafeedId);
if (holder != null) {
holder.stop(reason, timeout, null);
}
}
public synchronized void stopAllDatafeeds(String reason) {
int numDatafeeds = runningDatafeeds.size(); int numDatafeeds = runningDatafeeds.size();
if (numDatafeeds != 0) { if (numDatafeeds != 0) {
logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason); logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason);
@ -225,9 +232,7 @@ public class DatafeedJobRunner extends AbstractComponent {
DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job); DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job);
DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(),
dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
Holder holder = new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), handler); return new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), handler);
task.setHolder(holder);
return holder;
} }
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeed, Job job) { DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeed, Job job) {

View File

@ -271,7 +271,8 @@ public class DatafeedJobRunnerTests extends ESTestCase {
Consumer<Exception> handler = mockConsumer(); Consumer<Exception> handler = mockConsumer();
boolean cancelled = randomBoolean(); boolean cancelled = randomBoolean();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L);
StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, startDatafeedRequest); StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null,
startDatafeedRequest, datafeedJobRunner);
datafeedJobRunner.run(task, handler); datafeedJobRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);