diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateJobAction.java index 252231f10b7..1f1cfd61f61 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateJobAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -31,10 +32,15 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; public class UpdateJobAction extends Action { public static final UpdateJobAction INSTANCE = new UpdateJobAction(); @@ -131,6 +137,7 @@ public class UpdateJobAction extends Action { + private final ConcurrentMap semaphoreByJob = ConcurrentCollections.newConcurrentMap(); private final JobManager jobManager; private final Client client; @@ -161,14 +168,33 @@ public class UpdateJobAction extends Action wrappedListener = listener; - if (request.getJobUpdate().isAutodetectProcessUpdate()) { - wrappedListener = ActionListener.wrap( + PersistentTasksInProgress tasks = clusterService.state().custom(PersistentTasksInProgress.TYPE); + boolean jobIsOpen = MlMetadata.getJobState(request.getJobId(), tasks) == JobState.OPENED; + + semaphoreByJob.computeIfAbsent(request.getJobId(), id -> new Semaphore(1)).acquire(); + + ActionListener wrappedListener; + if (jobIsOpen && request.getJobUpdate().isAutodetectProcessUpdate()) { + wrappedListener = ActionListener.wrap( response -> updateProcess(request, response, listener), - listener::onFailure); + e -> { + releaseJobSemaphore(request.getJobId()); + listener.onFailure(e); + }); + } + else { + wrappedListener = ActionListener.wrap( + response -> { + releaseJobSemaphore(request.getJobId()); + listener.onResponse(response); + }, + e -> { + releaseJobSemaphore(request.getJobId()); + listener.onFailure(e); + }); } - jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, wrappedListener); + jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, client, wrappedListener); } private void updateProcess(Request request, PutJobAction.Response updateConfigResponse, @@ -176,19 +202,26 @@ public class UpdateJobAction extends Action() { @Override public void onResponse(UpdateProcessAction.Response response) { + releaseJobSemaphore(request.getJobId()); listener.onResponse(updateConfigResponse); } @Override public void onFailure(Exception e) { + releaseJobSemaphore(request.getJobId()); listener.onFailure(e); } }); } + private void releaseJobSemaphore(String jobId) { + semaphoreByJob.remove(jobId).release(); + } + @Override protected ClusterBlockException checkBlock(UpdateJobAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java index 192cdb49ad2..b036bcb5e6e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java @@ -37,7 +37,6 @@ import java.util.Objects; public class UpdateProcessAction extends Action { - public static final UpdateProcessAction INSTANCE = new UpdateProcessAction(); public static final String NAME = "cluster:admin/ml/job/update/process"; @@ -199,15 +198,8 @@ public class UpdateProcessAction extends protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> { try { - if (request.getModelDebugConfig() != null) { - processManager.writeUpdateModelDebugMessage(request.getJobId(), request.getModelDebugConfig()); - } - if (request.getDetectorUpdates() != null) { - for (JobUpdate.DetectorUpdate update : request.getDetectorUpdates()) { - processManager.writeUpdateDetectorRulesMessage(request.getJobId(), update.getIndex(), update.getRules()); - } - } - + processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), + request.getModelDebugConfig()); listener.onResponse(new Response()); } catch (Exception e) { listener.onFailure(e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1d918d5f2f9..aa539c73f56 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -198,7 +198,9 @@ public class JobManager extends AbstractComponent { }); } - public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener actionListener) { + public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, Client client, + ActionListener actionListener) { + clusterService.submitStateUpdateTask("update-job-" + jobId, new AckedClusterStateUpdateTask(request, actionListener) { private Job updatedJob; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 467433a9585..d2825ba5fed 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -19,9 +19,9 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.JobManager; -import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -168,23 +168,25 @@ public class AutodetectProcessManager extends AbstractComponent { } } - public void writeUpdateModelDebugMessage(String jobId, ModelDebugConfig config) throws IOException { + public void writeUpdateProcessMessage(String jobId, List updates, ModelDebugConfig config) + throws IOException { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); return; } - communicator.writeUpdateModelDebugMessage(config); - // TODO check for errors from autodetects - } - public void writeUpdateDetectorRulesMessage(String jobId, int detectorIndex, List rules) throws IOException { - AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); - if (communicator == null) { - logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); - return; + if (config != null) { + communicator.writeUpdateModelDebugMessage(config); + } + + if (updates != null) { + for (JobUpdate.DetectorUpdate update : updates) { + if (update.getRules() != null) { + communicator.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules()); + } + } } - communicator.writeUpdateDetectorRulesMessage(detectorIndex, rules); // TODO check for errors from autodetects } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java index b3d46ef5be6..e8165f3b8d5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index a15334aefb1..7bd909275bf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -270,19 +271,14 @@ public class AutodetectProcessManagerTests extends ESTestCase { assertEquals("[foo] exception while flushing job", e.getMessage()); } - public void testWriteUpdateModelDebugMessage() throws IOException { + public void testwriteUpdateProcessMessage() throws IOException { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); ModelDebugConfig debugConfig = mock(ModelDebugConfig.class); - manager.writeUpdateModelDebugMessage("foo", debugConfig); - verify(communicator).writeUpdateModelDebugMessage(debugConfig); - } - - public void testWriteUpdateDetectorRulesMessage() throws IOException { - AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); - AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); List rules = Collections.singletonList(mock(DetectionRule.class)); - manager.writeUpdateDetectorRulesMessage("foo", 2, rules); + List detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules)); + manager.writeUpdateProcessMessage("foo", detectorUpdates, debugConfig); + verify(communicator).writeUpdateModelDebugMessage(debugConfig); verify(communicator).writeUpdateDetectorRulesMessage(2, rules); } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index 82f39605214..4de016d6054 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -214,12 +214,23 @@ } - match: { job_id: "to-update" } + - do: + xpack.ml.open_job: + job_id: to-update + - do: xpack.ml.update_job: job_id: to-update body: > { "description":"Post update description", + "detectors": [{"index": 0, "rules": {"target_field_name": "airline", + "rule_conditions": [ { "condition_type": "numerical_actual", + "condition": {"operator": "gt", "value": "10" } } ] } }, + {"index": 1, "description": "updated description"}], + "model_debug_config": { + "bounds_percentile": 99.0 + }, "analysis_limits": { "model_memory_limit": 20 }, @@ -234,8 +245,11 @@ } - match: { job_id: "to-update" } - match: { description: "Post update description" } + - match: { model_debug_config.bounds_percentile: 99.0 } - match: { analysis_limits.model_memory_limit: 20 } - match: { analysis_config.categorization_filters: ["cat3.*"] } + - match: { analysis_config.detectors.0.detector_rules.0.target_field_name: "airline" } + - match: { analysis_config.detectors.1.detector_description: "updated description" } - match: { renormalization_window_days: 10 } - match: { background_persist_interval: 10800 } - match: { model_snapshot_retention_days: 30 }