diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java index 5b0a645b569..499e569e6db 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; @@ -28,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams; @@ -94,10 +92,6 @@ public class FlushJobAction extends Action listener) { - jobManager.getJobOrThrowIfUnknown(request.getJobId()); - InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder(); paramsBuilder.calcInterim(request.getCalcInterim()); if (request.getAdvanceTime() != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index 4be40231cdd..b67c0a5d126 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; @@ -27,7 +26,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; @@ -223,11 +221,10 @@ public class PostDataAction extends Action listener) { + protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build(); DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription())); try { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index a411bc41509..bea8c998635 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -7,16 +7,14 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; @@ -25,18 +23,18 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.io.IOException; import java.util.List; -import java.util.function.Function; import java.util.function.Supplier; /** @@ -47,55 +45,49 @@ import java.util.function.Supplier; public abstract class TransportJobTaskAction, Response extends BaseTasksResponse & Writeable> extends TransportTasksAction { - protected final JobManager jobManager; protected final AutodetectProcessManager processManager; - private final Function jobIdFromRequest; - private final TransportListTasksAction listTasksAction; TransportJobTaskAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier requestSupplier, - Supplier responseSupplier, String nodeExecutor, JobManager jobManager, - AutodetectProcessManager processManager, Function jobIdFromRequest, - TransportListTasksAction listTasksAction) { + Supplier responseSupplier, String nodeExecutor, AutodetectProcessManager processManager) { super(settings, actionName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, requestSupplier, responseSupplier, nodeExecutor); - this.jobManager = jobManager; this.processManager = processManager; - this.jobIdFromRequest = jobIdFromRequest; - this.listTasksAction = listTasksAction; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { - // the same validation that exists in AutodetectProcessManager#processData(...) and flush(...) methods - // is required here too because if the job hasn't been opened yet then no task exist for it yet and then - // #taskOperation(...) method will not be invoked, returning an empty result to the client. - // This ensures that we return an understandable error: - String jobId = jobIdFromRequest.apply(request); - jobManager.getJobOrThrowIfUnknown(jobId); - JobState jobState = jobManager.getJobState(jobId); - if (jobState != JobState.OPENED) { - listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + jobState + + String jobId = request.getJobId(); + // We need to check whether there is at least an assigned task here, otherwise we cannot redirect to the + // node running the job task. + ClusterState state = clusterService.state(); + JobManager.getJobOrThrowIfUnknown(state, jobId); + PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress.PersistentTaskInProgress jobTask = MlMetadata.getJobTask(jobId, tasks); + if (jobTask == null || jobTask.getExecutorNode() == null) { + listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED + "], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT)); } else { - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - listTasksRequest.setActions(OpenJobAction.NAME + "[c]"); - listTasksAction.execute(listTasksRequest, ActionListener.wrap(listTasksResponse -> { - String expectedDescription = "job-" + jobId; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (expectedDescription.equals(taskInfo.getDescription())) { - request.setTaskId(taskInfo.getTaskId()); - super.doExecute(task, request, listener); - return; - } - } - listener.onFailure(new ResourceNotFoundException("task not found for job [" + jobId + "] " + listTasksResponse)); - }, listener::onFailure)); + request.setNodes(jobTask.getExecutorNode()); + super.doExecute(task, request, listener); } } + @Override + protected final void taskOperation(Request request, OperationTask task, ActionListener listener) { + PersistentTasksInProgress tasks = clusterService.state().metaData().custom(PersistentTasksInProgress.TYPE); + JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks); + if (jobState == JobState.OPENED) { + innerTaskOperation(request, task, listener); + } else { + listener.onFailure(new ElasticsearchStatusException("job [" + request.getJobId() + "] state is [" + jobState + + "], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT)); + } + } + + protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener listener); + @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java index b036bcb5e6e..1cb051ebc95 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; @@ -25,7 +24,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -181,10 +179,9 @@ public class UpdateProcessAction extends @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - JobManager jobManager, AutodetectProcessManager processManager, TransportListTasksAction listTasksAction) { + AutodetectProcessManager processManager) { super(settings, NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, jobManager, processManager, Request::getJobId, - listTasksAction); + Request::new, Response::new, MachineLearning.THREAD_POOL_NAME, processManager); } @Override @@ -195,7 +192,7 @@ public class UpdateProcessAction extends } @Override - protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { + protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> { try { processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index aa539c73f56..cd0cde07e01 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.job; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; @@ -150,7 +149,7 @@ public class JobManager extends AbstractComponent { * @throws org.elasticsearch.ResourceNotFoundException * if there is no job with matching the given {@code jobId} */ - Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) { + public static Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); Job job = mlMetadata.getJobs().get(jobId); if (job == null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 2e46da1556e..7fba2cf459c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -116,15 +116,9 @@ public class AutodetectProcessManager extends AbstractComponent { * @return Count of records, fields, bytes, etc written */ public DataCounts processData(String jobId, InputStream input, DataLoadParams params) { - JobState jobState = jobManager.getJobState(jobId); - if (jobState != JobState.OPENED) { - throw new IllegalArgumentException("job [" + jobId + "] state is [" + jobState + "], but must be [" - + JobState.OPENED + "] for processing data"); - } - AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { - throw new IllegalStateException("job [" + jobId + "] with state [" + jobState + "] hasn't been started"); + throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job"); } try { return communicator.writeToJob(input, params); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 77957f1dfe6..e0e7f376bf3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -66,19 +66,17 @@ public class JobManagerTests extends ESTestCase { } public void testGetJobOrThrowIfUnknown_GivenUnknownJob() { - JobManager jobManager = createJobManager(); ClusterState cs = createClusterState(); - ESTestCase.expectThrows(ResourceNotFoundException.class, () -> jobManager.getJobOrThrowIfUnknown(cs, "foo")); + ESTestCase.expectThrows(ResourceNotFoundException.class, () -> JobManager.getJobOrThrowIfUnknown(cs, "foo")); } public void testGetJobOrThrowIfUnknown_GivenKnownJob() { - JobManager jobManager = createJobManager(); Job job = buildJobBuilder("foo").build(); MlMetadata mlMetadata = new MlMetadata.Builder().putJob(job, false).build(); ClusterState cs = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata)).build(); - assertEquals(job, jobManager.getJobOrThrowIfUnknown(cs, "foo")); + assertEquals(job, JobManager.getJobOrThrowIfUnknown(cs, "foo")); } public void testGetJob_GivenJobIdIsAll() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 7bd909275bf..90f16a038e8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -94,7 +94,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class)); jobDataCountsPersister = mock(JobDataCountsPersister.class); normalizerFactory = mock(NormalizerFactory.class); - givenAllocationWithState(JobState.OPENED); when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo")); doAnswer(invocationOnMock -> { @@ -298,7 +297,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); when(communicator.writeToJob(any(), any())).thenReturn(new DataCounts("foo")); AutodetectProcessManager manager = createManager(communicator); - givenAllocationWithState(JobState.OPENED); InputStream inputStream = createInputStream(""); manager.openJob("foo", 1L, false, e -> {}); @@ -333,10 +331,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { verify(autodetectProcess, times(1)).close(); } - private void givenAllocationWithState(JobState state) { - when(jobManager.getJobState("foo")).thenReturn(state); - } - private AutodetectProcessManager createManager(AutodetectCommunicator communicator) { Client client = mock(Client.class); return createManager(communicator, client); diff --git a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index ae9a8da7cbe..ad05e719efc 100644 --- a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -5,9 +5,9 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.apache.http.HttpHost; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Response; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; @@ -16,8 +16,10 @@ import org.elasticsearch.xpack.ml.MachineLearning; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentType.JSON; @@ -35,7 +37,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { assertTrue((Boolean) ml.get("enabled")); } - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/592") public void testMiniFarequote() throws Exception { String jobId = "foo1"; createFarequoteJob(jobId); @@ -43,6 +44,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"); assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response)); + assertBusy(this::assertSameClusterStateOnAllNodes); String postData = "{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" + @@ -89,7 +91,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { assertEquals(200, response.getStatusLine().getStatusCode()); } - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/592") public void testMiniFarequoteWithDatafeeder() throws Exception { String mappings = "{" + " \"mappings\": {" @@ -201,4 +202,28 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { return XContentHelper.convertToMap(JSON.xContent(), response.getEntity().getContent(), false); } + // When open job api returns the cluster state on nodes other than master node or node that acted as coordinating node, + // may not have had the latest update with job state set to opened. This may fail subsequent post data, flush, or + // close calls until that node that is running the job task has applied the cluster state where job state has been set to opened. + // this method waits until all nodes in the cluster have the same cluster state version, so that such failures can be + // avoided in tests. Note that the job has been started on the node running the job task (autodetect process is running), + // this is just a workaround for inconsistency in cluster states that may happen for a small amount of time. + private void assertSameClusterStateOnAllNodes(){ + assert getClusterHosts().size() > 1; + Set versions = new HashSet<>(); + for (HttpHost host : getClusterHosts()) { + try { + // Client round robins between cluster hosts: + Response response = client().performRequest("get", "/_cluster/state/version", Collections.singletonMap("local", "true")); + assertEquals(200, response.getStatusLine().getStatusCode()); + int version = (Integer) responseEntityToMap(response).get("version"); + logger.info("Sampled version [{}]", version); + versions.add(version); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + assertEquals(1, versions.size()); + } + }