From 2f4dcf36a90b3cc3a526814b1b260775357e11fc Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 13 Feb 2018 18:46:00 +0000 Subject: [PATCH] [ML] Add notification for job updates coming from a user action (elastic/x-pack-elasticsearch#3890) We were missing a notification for when a job is updated. This is useful so users know that there's been changes which could justify a change in the job behaviour. In addition, having those notifications allows our integrations tests to know when the update was processed which avoids having to use `sleep()` with its instabilities. Original commit: elastic/x-pack-elasticsearch@0b4eda22329a2d9f0ed280d0e8555bfadc2e964f --- .../xpack/core/ml/action/UpdateJobAction.java | 34 ++++++++- .../xpack/core/ml/job/config/JobUpdate.java | 46 ++++++++++++ .../xpack/core/ml/job/messages/Messages.java | 5 +- .../ml/action/TransportOpenJobAction.java | 10 +-- .../ml/action/TransportUpdateJobAction.java | 5 +- .../xpack/ml/datafeed/ProblemTracker.java | 2 +- .../xpack/ml/job/JobManager.java | 73 +++++++++++++------ .../ml/job/UpdateJobProcessNotifier.java | 29 ++++++-- .../output/AutoDetectResultProcessor.java | 4 +- .../xpack/ml/job/JobManagerTests.java | 6 +- .../AutoDetectResultProcessorTests.java | 2 +- .../ml/integration/DetectionRulesIT.java | 21 +++++- .../ml/integration/ScheduledEventsIT.java | 24 ++++-- 13 files changed, 203 insertions(+), 58 deletions(-) diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java index 78e018ce69c..4cddc74920b 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -49,14 +50,26 @@ public class UpdateJobAction extends Action getUpdateFields() { + Set updateFields = new TreeSet<>(); + if (groups != null) { + updateFields.add(Job.GROUPS.getPreferredName()); + } + if (description != null) { + updateFields.add(Job.DESCRIPTION.getPreferredName()); + } + if (detectorUpdates != null) { + updateFields.add(DETECTORS.getPreferredName()); + } + if (modelPlotConfig != null) { + updateFields.add(Job.MODEL_PLOT_CONFIG.getPreferredName()); + } + if (analysisLimits != null) { + updateFields.add(Job.ANALYSIS_LIMITS.getPreferredName()); + } + if (renormalizationWindowDays != null) { + updateFields.add(Job.RENORMALIZATION_WINDOW_DAYS.getPreferredName()); + } + if (backgroundPersistInterval != null) { + updateFields.add(Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName()); + } + if (modelSnapshotRetentionDays != null) { + updateFields.add(Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName()); + } + if (resultsRetentionDays != null) { + updateFields.add(Job.RESULTS_RETENTION_DAYS.getPreferredName()); + } + if (categorizationFilters != null) { + updateFields.add(AnalysisConfig.CATEGORIZATION_FILTERS.getPreferredName()); + } + if (customSettings != null) { + updateFields.add(Job.CUSTOM_SETTINGS.getPreferredName()); + } + if (modelSnapshotId != null) { + updateFields.add(Job.MODEL_SNAPSHOT_ID.getPreferredName()); + } + if (establishedModelMemory != null) { + updateFields.add(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName()); + } + return updateFields; + } + /** * Updates {@code source} with the new values in this object returning a new {@link Job}. * diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 1b3aa263b26..19941d4e7c4 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -49,8 +49,9 @@ public final class Messages { public static final String INVALID_GROUP = "Invalid group id ''{0}''; must be non-empty string and may contain lowercase alphanumeric" + " (a-z and 0-9), hyphens or underscores; must start and end with alphanumeric"; - public static final String JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN = "Datafeed has started retrieving data again"; + public static final String JOB_AUDIT_DATAFEED_DATA_SEEN_AGAIN = "Datafeed has started retrieving data again"; public static final String JOB_AUDIT_CREATED = "Job created"; + public static final String JOB_AUDIT_UPDATED = "Job updated: {0}"; public static final String JOB_AUDIT_CLOSING = "Job is closing"; public static final String JOB_AUDIT_FORCE_CLOSING = "Job is closing (forced)"; public static final String JOB_AUDIT_DATAFEED_CONTINUED_REALTIME = "Datafeed continued in real-time"; @@ -68,6 +69,8 @@ public final class Messages { public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {1}"; public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''"; public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted"; + public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process"; + public static final String JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS = "Updated calendars in running process"; public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates"; public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY = diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index f2f1e699bb4..3f011604a96 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -36,6 +36,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.Index; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; @@ -56,10 +60,6 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.AllocatedPersistentTask; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksExecutor; -import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -429,7 +429,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction 0) { JobUpdate update = new JobUpdate.Builder(job.getId()) .setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(job.getId(), update); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, establishedMemoryUpdateListener); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateJobAction.java index 7d915834e9e..3c577bb941b 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateJobAction.java @@ -46,13 +46,12 @@ public class TransportUpdateJobAction extends TransportMasterNodeAction listener) throws Exception { + protected void masterOperation(UpdateJobAction.Request request, ClusterState state, ActionListener listener) { if (request.getJobId().equals(MetaData.ALL)) { throw new IllegalArgumentException("Job Id " + MetaData.ALL + " cannot be for update"); } - jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, listener); + jobManager.updateJob(request, listener); } @Override diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java index d8001fb69b4..9844631c7b5 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java @@ -85,7 +85,7 @@ class ProblemTracker { public void reportNoneEmptyCount() { if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) { - auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN)); + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_DATA_SEEN_AGAIN)); } emptyDataCount = 0; } diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index d602a18a1b7..df5acd63147 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; @@ -26,12 +25,14 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -43,7 +44,6 @@ import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; @@ -235,13 +235,13 @@ public class JobManager extends AbstractComponent { jobProvider.checkForLeftOverDocuments(job, checkForLeftOverDocs); } - public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener actionListener) { - Job job = getJobOrThrowIfUnknown(jobId); - validate(jobUpdate, job, isValid -> { + public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { + Job job = getJobOrThrowIfUnknown(request.getJobId()); + validate(request.getJobUpdate(), job, isValid -> { if (isValid) { - internalJobUpdate(jobId, jobUpdate, request, actionListener); + internalJobUpdate(request, actionListener); } else { - actionListener.onFailure(new IllegalArgumentException("Invalid update to job [" + jobId + "]")); + actionListener.onFailure(new IllegalArgumentException("Invalid update to job [" + request.getJobId() + "]")); } }, actionListener::onFailure); } @@ -268,12 +268,11 @@ public class JobManager extends AbstractComponent { } } - private void internalJobUpdate(String jobId, JobUpdate jobUpdate, AckedRequest request, - ActionListener actionListener) { - clusterService.submitStateUpdateTask("update-job-" + jobId, + private void internalJobUpdate(UpdateJobAction.Request request, ActionListener actionListener) { + clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new AckedClusterStateUpdateTask(request, actionListener) { private volatile Job updatedJob; - private volatile boolean changeWasRequired; + private volatile boolean processUpdateRequired; @Override protected PutJobAction.Response newResponse(boolean acknowledged) { @@ -281,26 +280,33 @@ public class JobManager extends AbstractComponent { } @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Job job = getJobOrThrowIfUnknown(jobId, currentState); - updatedJob = jobUpdate.mergeWithJob(job, maxModelMemoryLimit); + public ClusterState execute(ClusterState currentState) { + Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState); + updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit); if (updatedJob.equals(job)) { // nothing to do return currentState; } // No change is required if the fields that the C++ uses aren't being updated - changeWasRequired = jobUpdate.isAutodetectProcessUpdate(); + processUpdateRequired = request.getJobUpdate().isAutodetectProcessUpdate(); return updateClusterState(updatedJob, true, currentState); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (changeWasRequired) { - if (isJobOpen(newState, jobId)) { - updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate)); - } + JobUpdate jobUpdate = request.getJobUpdate(); + if (processUpdateRequired && isJobOpen(newState, request.getJobId())) { + updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap( + isUpdated -> { + if (isUpdated) { + auditJobUpdatedIfNotInternal(request); + } + }, e -> { + // No need to do anything + } + )); } else { - logger.debug("[{}] Ignored job update with no changes: {}", () -> jobId, () -> { + logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> { try { XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); @@ -309,18 +315,26 @@ public class JobManager extends AbstractComponent { return "(unprintable due to " + e.getMessage() + ")"; } }); + + auditJobUpdatedIfNotInternal(request); } } }); } + private void auditJobUpdatedIfNotInternal(UpdateJobAction.Request request) { + if (request.isInternal() == false) { + auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_UPDATED, request.getJobUpdate().getUpdateFields())); + } + } + private boolean isJobOpen(ClusterState clusterState, String jobId) { PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); JobState jobState = MlMetadata.getJobState(jobId, persistentTasks); return jobState == JobState.OPENED; } - ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) { + private ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) { MlMetadata.Builder builder = createMlMetadataBuilder(currentState); builder.putJob(job, overwrite); return buildNewClusterState(currentState, builder); @@ -333,7 +347,14 @@ public class JobManager extends AbstractComponent { if (isJobOpen(clusterState, job.getId())) { Set jobFilters = job.getAnalysisConfig().extractReferencedFilters(); if (jobFilters.contains(filter.getId())) { - updateJobProcessNotifier.submitJobUpdate(UpdateParams.filterUpdate(job.getId(), filter)); + updateJobProcessNotifier.submitJobUpdate(UpdateParams.filterUpdate(job.getId(), filter), ActionListener.wrap( + isUpdated -> { + if (isUpdated) { + auditor.info(job.getId(), + Messages.getMessage(Messages.JOB_AUDIT_FILTER_UPDATED_ON_PROCESS, filter.getId())); + } + }, e -> {} + )); } } } @@ -345,7 +366,13 @@ public class JobManager extends AbstractComponent { calendarJobIds.forEach(jobId -> expandedJobIds.addAll(expandJobIds(jobId, true, clusterState))); for (String jobId : expandedJobIds) { if (isJobOpen(clusterState, jobId)) { - updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId)); + updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId), ActionListener.wrap( + isUpdated -> { + if (isUpdated) { + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); + } + }, e -> {} + )); } } } diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java index 6501c5532b0..b9c795df9b7 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/UpdateJobProcessNotifier.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; @@ -34,7 +35,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local private final Client client; private final ThreadPool threadPool; - private final LinkedBlockingQueue orderedJobUpdates = new LinkedBlockingQueue<>(1000); + private final LinkedBlockingQueue orderedJobUpdates = new LinkedBlockingQueue<>(1000); private volatile ThreadPool.Cancellable cancellable; @@ -51,8 +52,8 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local }); } - boolean submitJobUpdate(UpdateParams updateParams) { - return orderedJobUpdates.offer(updateParams); + boolean submitJobUpdate(UpdateParams update, ActionListener listener) { + return orderedJobUpdates.offer(new UpdateHolder(update, listener)); } @Override @@ -85,7 +86,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } private void processNextUpdate() { - List updates = new ArrayList<>(orderedJobUpdates.size()); + List updates = new ArrayList<>(orderedJobUpdates.size()); try { orderedJobUpdates.drainTo(updates); executeProcessUpdates(new VolatileCursorIterator<>(updates)); @@ -94,11 +95,12 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } } - void executeProcessUpdates(Iterator updatesIterator) { + void executeProcessUpdates(Iterator updatesIterator) { if (updatesIterator.hasNext() == false) { return; } - UpdateParams update = updatesIterator.next(); + UpdateHolder updateHolder = updatesIterator.next(); + UpdateParams update = updateHolder.update; Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(), update.isUpdateScheduledEvents()); @@ -108,8 +110,11 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local public void onResponse(Response response) { if (response.isUpdated()) { logger.info("Successfully updated remote job [{}]", update.getJobId()); + updateHolder.listener.onResponse(true); } else { - logger.error("Failed to update remote job [{}]", update.getJobId()); + String msg = "Failed to update remote job [" + update.getJobId() + "]"; + logger.error(msg); + updateHolder.listener.onFailure(ExceptionsHelper.serverError(msg)); } executeProcessUpdates(updatesIterator); } @@ -124,9 +129,19 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local } else { logger.error("Failed to update remote job [" + update.getJobId() + "]", e); } + updateHolder.listener.onFailure(e); executeProcessUpdates(updatesIterator); } }); } + private static class UpdateHolder { + private final UpdateParams update; + private final ActionListener listener; + + private UpdateHolder(UpdateParams update, ActionListener listener) { + this.update = update; + this.listener = listener; + } + } } diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 8f26a26f1b4..108bd32026a 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -295,7 +295,7 @@ public class AutoDetectResultProcessor { protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build(); - UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); try { // This blocks the main processing thread in the unlikely event @@ -328,7 +328,7 @@ public class AutoDetectResultProcessor { jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> { JobUpdate update = new JobUpdate.Builder(jobId) .setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { @Override diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index ef77b7fe691..667be5d41ea 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -217,7 +217,7 @@ public class JobManagerTests extends ESTestCase { jobManager.updateProcessOnFilterChanged(filter); ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); - verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture()); + verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any(ActionListener.class)); List capturedUpdateParams = updateParamsCaptor.getAllValues(); assertThat(capturedUpdateParams.size(), equalTo(2)); @@ -256,7 +256,7 @@ public class JobManagerTests extends ESTestCase { jobManager.updateProcessOnCalendarChanged(Arrays.asList("job-1", "job-3", "job-4")); ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); - verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture()); + verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any(ActionListener.class)); List capturedUpdateParams = updateParamsCaptor.getAllValues(); assertThat(capturedUpdateParams.size(), equalTo(2)); @@ -295,7 +295,7 @@ public class JobManagerTests extends ESTestCase { jobManager.updateProcessOnCalendarChanged(Collections.singletonList("group-1")); ArgumentCaptor updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class); - verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture()); + verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any(ActionListener.class)); List capturedUpdateParams = updateParamsCaptor.getAllValues(); assertThat(capturedUpdateParams.size(), equalTo(2)); diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index b593c7856e7..29faa1d9c40 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -319,7 +319,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { processorUnderTest.processResult(context, result); verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); - UpdateJobAction.Request expectedJobUpdateRequest = new UpdateJobAction.Request(JOB_ID, + UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID, new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build()); verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index 463c1d1e778..28565c5923c 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -5,7 +5,11 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.Condition; @@ -148,7 +152,7 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase { assertThat(secondHaldRecordByFieldValues, contains("by_field_value_1", "by_field_value_2")); } - public void testCategoricalRule() throws IOException, InterruptedException { + public void testCategoricalRule() throws Exception { MlFilter safeIps = new MlFilter("safe_ips", Arrays.asList("111.111.111.111", "222.222.222.222")); assertThat(putMlFilter(safeIps), is(true)); @@ -211,8 +215,19 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase { MlFilter updatedFilter = new MlFilter(safeIps.getId(), Collections.singletonList("333.333.333.333")); assertThat(putMlFilter(updatedFilter), is(true)); - // We need to give some time for the update to be applied on the autodetect process - Thread.sleep(1000); + // Wait until the notification that the process was updated is indexed + assertBusy(() -> { + SearchResponse searchResponse = client().prepareSearch(".ml-notifications") + .setSize(1) + .addSort("timestamp", SortOrder.DESC) + .setQuery(QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery("job_id", job.getId())) + .filter(QueryBuilders.termQuery("level", "info")) + ).get(); + SearchHit[] hits = searchResponse.getHits().getHits(); + assertThat(hits.length, equalTo(1)); + assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Updated filter [safe_ips] in running process")); + }); long secondAnomalyTime = timestamp; // Send another anomalous bucket diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java index 994de5c72a1..6703e4ef236 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java @@ -5,7 +5,11 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; @@ -27,6 +31,7 @@ import java.util.List; import java.util.stream.Collectors; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase { @@ -188,7 +193,7 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase { /** * Test an open job picks up changes to scheduled events/calendars */ - public void testOnlineUpdate() throws IOException, InterruptedException { + public void testOnlineUpdate() throws Exception { TimeValue bucketSpan = TimeValue.timeValueMinutes(30); Job.Builder job = createJob("scheduled-events-online-update", bucketSpan); @@ -216,10 +221,19 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase { postScheduledEvents(calendarId, events); - // The update process action is aysnc so give it chance to update - // the job with the added scheduled events - // TODO Wait for the task to finish once #3767 is implemented - Thread.sleep(1000); + // Wait until the notification that the process was updated is indexed + assertBusy(() -> { + SearchResponse searchResponse = client().prepareSearch(".ml-notifications") + .setSize(1) + .addSort("timestamp", SortOrder.DESC) + .setQuery(QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery("job_id", job.getId())) + .filter(QueryBuilders.termQuery("level", "info")) + ).get(); + SearchHit[] hits = searchResponse.getHits().getHits(); + assertThat(hits.length, equalTo(1)); + assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Updated calendars in running process")); + }); // write some more buckets of data that cover the scheduled event period postData(job.getId(), generateData(startTime + bucketCount * bucketSpan.millis(), bucketSpan, 5,