[ML] Synchronise updates to running C++ process (elastic/x-pack-elasticsearch#557)

* Update running process from cluster state update

* Put a big lock on the UpdateProcessAction

* Protect concurrent update of the process with a semaphore

* Review comments

Original commit: elastic/x-pack-elasticsearch@d08b53247a
This commit is contained in:
David Kyle 2017-02-17 16:00:46 +00:00 committed by GitHub
parent 1a4b87454c
commit 45d228dd41
7 changed files with 75 additions and 37 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -31,10 +32,15 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobAction.Response, UpdateJobAction.RequestBuilder> { public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobAction.Response, UpdateJobAction.RequestBuilder> {
public static final UpdateJobAction INSTANCE = new UpdateJobAction(); public static final UpdateJobAction INSTANCE = new UpdateJobAction();
@ -131,6 +137,7 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
public static class TransportAction extends TransportMasterNodeAction<UpdateJobAction.Request, PutJobAction.Response> { public static class TransportAction extends TransportMasterNodeAction<UpdateJobAction.Request, PutJobAction.Response> {
private final ConcurrentMap<String, Semaphore> semaphoreByJob = ConcurrentCollections.newConcurrentMap();
private final JobManager jobManager; private final JobManager jobManager;
private final Client client; private final Client client;
@ -161,14 +168,33 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update"); throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update");
} }
ActionListener<PutJobAction.Response> wrappedListener = listener; PersistentTasksInProgress tasks = clusterService.state().custom(PersistentTasksInProgress.TYPE);
if (request.getJobUpdate().isAutodetectProcessUpdate()) { boolean jobIsOpen = MlMetadata.getJobState(request.getJobId(), tasks) == JobState.OPENED;
semaphoreByJob.computeIfAbsent(request.getJobId(), id -> new Semaphore(1)).acquire();
ActionListener<PutJobAction.Response> wrappedListener;
if (jobIsOpen && request.getJobUpdate().isAutodetectProcessUpdate()) {
wrappedListener = ActionListener.wrap( wrappedListener = ActionListener.wrap(
response -> updateProcess(request, response, listener), response -> updateProcess(request, response, listener),
listener::onFailure); e -> {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e);
});
}
else {
wrappedListener = ActionListener.wrap(
response -> {
releaseJobSemaphore(request.getJobId());
listener.onResponse(response);
},
e -> {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e);
});
} }
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, wrappedListener); jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, client, wrappedListener);
} }
private void updateProcess(Request request, PutJobAction.Response updateConfigResponse, private void updateProcess(Request request, PutJobAction.Response updateConfigResponse,
@ -176,19 +202,26 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
UpdateProcessAction.Request updateProcessRequest = new UpdateProcessAction.Request(request.getJobId(), UpdateProcessAction.Request updateProcessRequest = new UpdateProcessAction.Request(request.getJobId(),
request.getJobUpdate().getModelDebugConfig(), request.getJobUpdate().getDetectorUpdates()); request.getJobUpdate().getModelDebugConfig(), request.getJobUpdate().getDetectorUpdates());
client.execute(UpdateProcessAction.INSTANCE, updateProcessRequest, new ActionListener<UpdateProcessAction.Response>() { client.execute(UpdateProcessAction.INSTANCE, updateProcessRequest, new ActionListener<UpdateProcessAction.Response>() {
@Override @Override
public void onResponse(UpdateProcessAction.Response response) { public void onResponse(UpdateProcessAction.Response response) {
releaseJobSemaphore(request.getJobId());
listener.onResponse(updateConfigResponse); listener.onResponse(updateConfigResponse);
} }
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e); listener.onFailure(e);
} }
}); });
} }
private void releaseJobSemaphore(String jobId) {
semaphoreByJob.remove(jobId).release();
}
@Override @Override
protected ClusterBlockException checkBlock(UpdateJobAction.Request request, ClusterState state) { protected ClusterBlockException checkBlock(UpdateJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

View File

@ -37,7 +37,6 @@ import java.util.Objects;
public class UpdateProcessAction extends public class UpdateProcessAction extends
Action<UpdateProcessAction.Request, UpdateProcessAction.Response, UpdateProcessAction.RequestBuilder> { Action<UpdateProcessAction.Request, UpdateProcessAction.Response, UpdateProcessAction.RequestBuilder> {
public static final UpdateProcessAction INSTANCE = new UpdateProcessAction(); public static final UpdateProcessAction INSTANCE = new UpdateProcessAction();
public static final String NAME = "cluster:admin/ml/job/update/process"; public static final String NAME = "cluster:admin/ml/job/update/process";
@ -199,15 +198,8 @@ public class UpdateProcessAction extends
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) { protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> { threadPool.executor(MachineLearning.THREAD_POOL_NAME).execute(() -> {
try { try {
if (request.getModelDebugConfig() != null) { processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(),
processManager.writeUpdateModelDebugMessage(request.getJobId(), request.getModelDebugConfig()); request.getModelDebugConfig());
}
if (request.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : request.getDetectorUpdates()) {
processManager.writeUpdateDetectorRulesMessage(request.getJobId(), update.getIndex(), update.getRules());
}
}
listener.onResponse(new Response()); listener.onResponse(new Response());
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);

View File

@ -198,7 +198,9 @@ public class JobManager extends AbstractComponent {
}); });
} }
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener<PutJobAction.Response> actionListener) { public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, Client client,
ActionListener<PutJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("update-job-" + jobId, clusterService.submitStateUpdateTask("update-job-" + jobId,
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) { new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
private Job updatedJob; private Job updatedJob;

View File

@ -19,9 +19,9 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
@ -168,23 +168,25 @@ public class AutodetectProcessManager extends AbstractComponent {
} }
} }
public void writeUpdateModelDebugMessage(String jobId, ModelDebugConfig config) throws IOException { public void writeUpdateProcessMessage(String jobId, List<JobUpdate.DetectorUpdate> updates, ModelDebugConfig config)
throws IOException {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
if (communicator == null) { if (communicator == null) {
logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
return; return;
} }
communicator.writeUpdateModelDebugMessage(config);
// TODO check for errors from autodetects
}
public void writeUpdateDetectorRulesMessage(String jobId, int detectorIndex, List<DetectionRule> rules) throws IOException { if (config != null) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); communicator.writeUpdateModelDebugMessage(config);
if (communicator == null) { }
logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId);
return; if (updates != null) {
for (JobUpdate.DetectorUpdate update : updates) {
if (update.getRules() != null) {
communicator.writeUpdateDetectorRulesMessage(update.getIndex(), update.getRules());
}
}
} }
communicator.writeUpdateDetectorRulesMessage(detectorIndex, rules);
// TODO check for errors from autodetects // TODO check for errors from autodetects
} }

View File

@ -12,7 +12,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;

View File

@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig; import org.elasticsearch.xpack.ml.job.config.ModelDebugConfig;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
@ -270,19 +271,14 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals("[foo] exception while flushing job", e.getMessage()); assertEquals("[foo] exception while flushing job", e.getMessage());
} }
public void testWriteUpdateModelDebugMessage() throws IOException { public void testwriteUpdateProcessMessage() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo"); AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
ModelDebugConfig debugConfig = mock(ModelDebugConfig.class); ModelDebugConfig debugConfig = mock(ModelDebugConfig.class);
manager.writeUpdateModelDebugMessage("foo", debugConfig);
verify(communicator).writeUpdateModelDebugMessage(debugConfig);
}
public void testWriteUpdateDetectorRulesMessage() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManagerAndCallProcessData(communicator, "foo");
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class)); List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
manager.writeUpdateDetectorRulesMessage("foo", 2, rules); List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules));
manager.writeUpdateProcessMessage("foo", detectorUpdates, debugConfig);
verify(communicator).writeUpdateModelDebugMessage(debugConfig);
verify(communicator).writeUpdateDetectorRulesMessage(2, rules); verify(communicator).writeUpdateDetectorRulesMessage(2, rules);
} }

View File

@ -214,12 +214,23 @@
} }
- match: { job_id: "to-update" } - match: { job_id: "to-update" }
- do:
xpack.ml.open_job:
job_id: to-update
- do: - do:
xpack.ml.update_job: xpack.ml.update_job:
job_id: to-update job_id: to-update
body: > body: >
{ {
"description":"Post update description", "description":"Post update description",
"detectors": [{"index": 0, "rules": {"target_field_name": "airline",
"rule_conditions": [ { "condition_type": "numerical_actual",
"condition": {"operator": "gt", "value": "10" } } ] } },
{"index": 1, "description": "updated description"}],
"model_debug_config": {
"bounds_percentile": 99.0
},
"analysis_limits": { "analysis_limits": {
"model_memory_limit": 20 "model_memory_limit": 20
}, },
@ -234,8 +245,11 @@
} }
- match: { job_id: "to-update" } - match: { job_id: "to-update" }
- match: { description: "Post update description" } - match: { description: "Post update description" }
- match: { model_debug_config.bounds_percentile: 99.0 }
- match: { analysis_limits.model_memory_limit: 20 } - match: { analysis_limits.model_memory_limit: 20 }
- match: { analysis_config.categorization_filters: ["cat3.*"] } - match: { analysis_config.categorization_filters: ["cat3.*"] }
- match: { analysis_config.detectors.0.detector_rules.0.target_field_name: "airline" }
- match: { analysis_config.detectors.1.detector_description: "updated description" }
- match: { renormalization_window_days: 10 } - match: { renormalization_window_days: 10 }
- match: { background_persist_interval: 10800 } - match: { background_persist_interval: 10800 }
- match: { model_snapshot_retention_days: 30 } - match: { model_snapshot_retention_days: 30 }