[ML] Add notification for job updates coming from a user action (elastic/x-pack-elasticsearch#3890)

We were missing a notification for when a job is updated. This is
useful so users know that there's been changes which could justify
a change in the job behaviour.

In addition, having those notifications allows our integrations
tests to know when the update was processed which avoids having
to use `sleep()` with its instabilities.



Original commit: elastic/x-pack-elasticsearch@0b4eda2232
This commit is contained in:
Dimitris Athanasiou 2018-02-13 18:46:00 +00:00 committed by GitHub
parent 938cf239c9
commit 2f4dcf36a9
13 changed files with 203 additions and 58 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
@ -49,14 +50,26 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
private String jobId;
private JobUpdate update;
/** Indicates an update that was not triggered by a user */
private boolean isInternal;
public Request(String jobId, JobUpdate update) {
this(jobId, update, false);
}
private Request(String jobId, JobUpdate update, boolean isInternal) {
this.jobId = jobId;
this.update = update;
this.isInternal = isInternal;
}
public Request() {
}
public static Request internal(String jobId, JobUpdate update) {
return new Request(jobId, update, true);
}
public String getJobId() {
return jobId;
}
@ -65,6 +78,10 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
return update;
}
public boolean isInternal() {
return isInternal;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -75,6 +92,11 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
super.readFrom(in);
jobId = in.readString();
update = new JobUpdate(in);
if (in.getVersion().onOrAfter(Version.V_6_2_2)) {
isInternal = in.readBoolean();
} else {
isInternal = false;
}
}
@Override
@ -82,6 +104,9 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
super.writeTo(out);
out.writeString(jobId);
update.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_2_2)) {
out.writeBoolean(isInternal);
}
}
@Override
@ -95,14 +120,15 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateJobAction.Request request = (UpdateJobAction.Request) o;
return Objects.equals(jobId, request.jobId) &&
Objects.equals(update, request.update);
UpdateJobAction.Request that = (UpdateJobAction.Request) o;
return Objects.equals(jobId, that.jobId) &&
Objects.equals(update, that.update) &&
isInternal == that.isInternal;
}
@Override
public int hashCode() {
return Objects.hash(jobId, update);
return Objects.hash(jobId, update, isInternal);
}
@Override

View File

@ -26,6 +26,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
public class JobUpdate implements Writeable, ToXContentObject {
public static final ParseField DETECTORS = new ParseField("detectors");
@ -267,6 +269,50 @@ public class JobUpdate implements Writeable, ToXContentObject {
return builder;
}
public Set<String> getUpdateFields() {
Set<String> updateFields = new TreeSet<>();
if (groups != null) {
updateFields.add(Job.GROUPS.getPreferredName());
}
if (description != null) {
updateFields.add(Job.DESCRIPTION.getPreferredName());
}
if (detectorUpdates != null) {
updateFields.add(DETECTORS.getPreferredName());
}
if (modelPlotConfig != null) {
updateFields.add(Job.MODEL_PLOT_CONFIG.getPreferredName());
}
if (analysisLimits != null) {
updateFields.add(Job.ANALYSIS_LIMITS.getPreferredName());
}
if (renormalizationWindowDays != null) {
updateFields.add(Job.RENORMALIZATION_WINDOW_DAYS.getPreferredName());
}
if (backgroundPersistInterval != null) {
updateFields.add(Job.BACKGROUND_PERSIST_INTERVAL.getPreferredName());
}
if (modelSnapshotRetentionDays != null) {
updateFields.add(Job.MODEL_SNAPSHOT_RETENTION_DAYS.getPreferredName());
}
if (resultsRetentionDays != null) {
updateFields.add(Job.RESULTS_RETENTION_DAYS.getPreferredName());
}
if (categorizationFilters != null) {
updateFields.add(AnalysisConfig.CATEGORIZATION_FILTERS.getPreferredName());
}
if (customSettings != null) {
updateFields.add(Job.CUSTOM_SETTINGS.getPreferredName());
}
if (modelSnapshotId != null) {
updateFields.add(Job.MODEL_SNAPSHOT_ID.getPreferredName());
}
if (establishedModelMemory != null) {
updateFields.add(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName());
}
return updateFields;
}
/**
* Updates {@code source} with the new values in this object returning a new {@link Job}.
*

View File

@ -49,8 +49,9 @@ public final class Messages {
public static final String INVALID_GROUP = "Invalid group id ''{0}''; must be non-empty string and may contain lowercase alphanumeric" +
" (a-z and 0-9), hyphens or underscores; must start and end with alphanumeric";
public static final String JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN = "Datafeed has started retrieving data again";
public static final String JOB_AUDIT_DATAFEED_DATA_SEEN_AGAIN = "Datafeed has started retrieving data again";
public static final String JOB_AUDIT_CREATED = "Job created";
public static final String JOB_AUDIT_UPDATED = "Job updated: {0}";
public static final String JOB_AUDIT_CLOSING = "Job is closing";
public static final String JOB_AUDIT_FORCE_CLOSING = "Job is closing (forced)";
public static final String JOB_AUDIT_DATAFEED_CONTINUED_REALTIME = "Datafeed continued in real-time";
@ -68,6 +69,8 @@ public final class Messages {
public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {1}";
public static final String JOB_AUDIT_REVERTED = "Job model snapshot reverted to ''{0}''";
public static final String JOB_AUDIT_SNAPSHOT_DELETED = "Model snapshot [{0}] with description ''{1}'' deleted";
public static final String JOB_AUDIT_FILTER_UPDATED_ON_PROCESS = "Updated filter [{0}] in running process";
public static final String JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS = "Updated calendars in running process";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY =

View File

@ -36,6 +36,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
@ -56,10 +60,6 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -429,7 +429,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
if (establishedModelMemory != null && establishedModelMemory > 0) {
JobUpdate update = new JobUpdate.Builder(job.getId())
.setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(job.getId(), update);
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(job.getId(), update);
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest,
establishedMemoryUpdateListener);

View File

@ -46,13 +46,12 @@ public class TransportUpdateJobAction extends TransportMasterNodeAction<UpdateJo
}
@Override
protected void masterOperation(UpdateJobAction.Request request, ClusterState state,
ActionListener<PutJobAction.Response> listener) throws Exception {
protected void masterOperation(UpdateJobAction.Request request, ClusterState state, ActionListener<PutJobAction.Response> listener) {
if (request.getJobId().equals(MetaData.ALL)) {
throw new IllegalArgumentException("Job Id " + MetaData.ALL + " cannot be for update");
}
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, listener);
jobManager.updateJob(request, listener);
}
@Override

View File

@ -85,7 +85,7 @@ class ProblemTracker {
public void reportNoneEmptyCount() {
if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) {
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIR_DATAFEED_DATA_SEEN_AGAIN));
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_DATA_SEEN_AGAIN));
}
emptyDataCount = 0;
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
@ -26,12 +25,14 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -43,7 +44,6 @@ import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
@ -235,13 +235,13 @@ public class JobManager extends AbstractComponent {
jobProvider.checkForLeftOverDocuments(job, checkForLeftOverDocs);
}
public void updateJob(String jobId, JobUpdate jobUpdate, AckedRequest request, ActionListener<PutJobAction.Response> actionListener) {
Job job = getJobOrThrowIfUnknown(jobId);
validate(jobUpdate, job, isValid -> {
public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
Job job = getJobOrThrowIfUnknown(request.getJobId());
validate(request.getJobUpdate(), job, isValid -> {
if (isValid) {
internalJobUpdate(jobId, jobUpdate, request, actionListener);
internalJobUpdate(request, actionListener);
} else {
actionListener.onFailure(new IllegalArgumentException("Invalid update to job [" + jobId + "]"));
actionListener.onFailure(new IllegalArgumentException("Invalid update to job [" + request.getJobId() + "]"));
}
}, actionListener::onFailure);
}
@ -268,12 +268,11 @@ public class JobManager extends AbstractComponent {
}
}
private void internalJobUpdate(String jobId, JobUpdate jobUpdate, AckedRequest request,
ActionListener<PutJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("update-job-" + jobId,
private void internalJobUpdate(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("update-job-" + request.getJobId(),
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
private volatile Job updatedJob;
private volatile boolean changeWasRequired;
private volatile boolean processUpdateRequired;
@Override
protected PutJobAction.Response newResponse(boolean acknowledged) {
@ -281,26 +280,33 @@ public class JobManager extends AbstractComponent {
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Job job = getJobOrThrowIfUnknown(jobId, currentState);
updatedJob = jobUpdate.mergeWithJob(job, maxModelMemoryLimit);
public ClusterState execute(ClusterState currentState) {
Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState);
updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit);
if (updatedJob.equals(job)) {
// nothing to do
return currentState;
}
// No change is required if the fields that the C++ uses aren't being updated
changeWasRequired = jobUpdate.isAutodetectProcessUpdate();
processUpdateRequired = request.getJobUpdate().isAutodetectProcessUpdate();
return updateClusterState(updatedJob, true, currentState);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (changeWasRequired) {
if (isJobOpen(newState, jobId)) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate));
}
JobUpdate jobUpdate = request.getJobUpdate();
if (processUpdateRequired && isJobOpen(newState, request.getJobId())) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(
isUpdated -> {
if (isUpdated) {
auditJobUpdatedIfNotInternal(request);
}
}, e -> {
// No need to do anything
}
));
} else {
logger.debug("[{}] Ignored job update with no changes: {}", () -> jobId, () -> {
logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
@ -309,18 +315,26 @@ public class JobManager extends AbstractComponent {
return "(unprintable due to " + e.getMessage() + ")";
}
});
auditJobUpdatedIfNotInternal(request);
}
}
});
}
private void auditJobUpdatedIfNotInternal(UpdateJobAction.Request request) {
if (request.isInternal() == false) {
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_UPDATED, request.getJobUpdate().getUpdateFields()));
}
}
private boolean isJobOpen(ClusterState clusterState, String jobId) {
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlMetadata.getJobState(jobId, persistentTasks);
return jobState == JobState.OPENED;
}
ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) {
private ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) {
MlMetadata.Builder builder = createMlMetadataBuilder(currentState);
builder.putJob(job, overwrite);
return buildNewClusterState(currentState, builder);
@ -333,7 +347,14 @@ public class JobManager extends AbstractComponent {
if (isJobOpen(clusterState, job.getId())) {
Set<String> jobFilters = job.getAnalysisConfig().extractReferencedFilters();
if (jobFilters.contains(filter.getId())) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.filterUpdate(job.getId(), filter));
updateJobProcessNotifier.submitJobUpdate(UpdateParams.filterUpdate(job.getId(), filter), ActionListener.wrap(
isUpdated -> {
if (isUpdated) {
auditor.info(job.getId(),
Messages.getMessage(Messages.JOB_AUDIT_FILTER_UPDATED_ON_PROCESS, filter.getId()));
}
}, e -> {}
));
}
}
}
@ -345,7 +366,13 @@ public class JobManager extends AbstractComponent {
calendarJobIds.forEach(jobId -> expandedJobIds.addAll(expandJobIds(jobId, true, clusterState)));
for (String jobId : expandedJobIds) {
if (isJobOpen(clusterState, jobId)) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId));
updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId), ActionListener.wrap(
isUpdated -> {
if (isUpdated) {
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS));
}
}, e -> {}
));
}
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
@ -34,7 +35,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
private final Client client;
private final ThreadPool threadPool;
private final LinkedBlockingQueue<UpdateParams> orderedJobUpdates = new LinkedBlockingQueue<>(1000);
private final LinkedBlockingQueue<UpdateHolder> orderedJobUpdates = new LinkedBlockingQueue<>(1000);
private volatile ThreadPool.Cancellable cancellable;
@ -51,8 +52,8 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
});
}
boolean submitJobUpdate(UpdateParams updateParams) {
return orderedJobUpdates.offer(updateParams);
boolean submitJobUpdate(UpdateParams update, ActionListener<Boolean> listener) {
return orderedJobUpdates.offer(new UpdateHolder(update, listener));
}
@Override
@ -85,7 +86,7 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
}
private void processNextUpdate() {
List<UpdateParams> updates = new ArrayList<>(orderedJobUpdates.size());
List<UpdateHolder> updates = new ArrayList<>(orderedJobUpdates.size());
try {
orderedJobUpdates.drainTo(updates);
executeProcessUpdates(new VolatileCursorIterator<>(updates));
@ -94,11 +95,12 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
}
}
void executeProcessUpdates(Iterator<UpdateParams> updatesIterator) {
void executeProcessUpdates(Iterator<UpdateHolder> updatesIterator) {
if (updatesIterator.hasNext() == false) {
return;
}
UpdateParams update = updatesIterator.next();
UpdateHolder updateHolder = updatesIterator.next();
UpdateParams update = updateHolder.update;
Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(),
update.isUpdateScheduledEvents());
@ -108,8 +110,11 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
public void onResponse(Response response) {
if (response.isUpdated()) {
logger.info("Successfully updated remote job [{}]", update.getJobId());
updateHolder.listener.onResponse(true);
} else {
logger.error("Failed to update remote job [{}]", update.getJobId());
String msg = "Failed to update remote job [" + update.getJobId() + "]";
logger.error(msg);
updateHolder.listener.onFailure(ExceptionsHelper.serverError(msg));
}
executeProcessUpdates(updatesIterator);
}
@ -124,9 +129,19 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
} else {
logger.error("Failed to update remote job [" + update.getJobId() + "]", e);
}
updateHolder.listener.onFailure(e);
executeProcessUpdates(updatesIterator);
}
});
}
private static class UpdateHolder {
private final UpdateParams update;
private final ActionListener<Boolean> listener;
private UpdateHolder(UpdateParams update, ActionListener<Boolean> listener) {
this.update = update;
this.listener = listener;
}
}
}

View File

@ -295,7 +295,7 @@ public class AutoDetectResultProcessor {
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update);
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
try {
// This blocks the main processing thread in the unlikely event
@ -328,7 +328,7 @@ public class AutoDetectResultProcessor {
jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> {
JobUpdate update = new JobUpdate.Builder(jobId)
.setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update);
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override

View File

@ -217,7 +217,7 @@ public class JobManagerTests extends ESTestCase {
jobManager.updateProcessOnFilterChanged(filter);
ArgumentCaptor<UpdateParams> updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class);
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture());
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any(ActionListener.class));
List<UpdateParams> capturedUpdateParams = updateParamsCaptor.getAllValues();
assertThat(capturedUpdateParams.size(), equalTo(2));
@ -256,7 +256,7 @@ public class JobManagerTests extends ESTestCase {
jobManager.updateProcessOnCalendarChanged(Arrays.asList("job-1", "job-3", "job-4"));
ArgumentCaptor<UpdateParams> updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class);
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture());
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any(ActionListener.class));
List<UpdateParams> capturedUpdateParams = updateParamsCaptor.getAllValues();
assertThat(capturedUpdateParams.size(), equalTo(2));
@ -295,7 +295,7 @@ public class JobManagerTests extends ESTestCase {
jobManager.updateProcessOnCalendarChanged(Collections.singletonList("group-1"));
ArgumentCaptor<UpdateParams> updateParamsCaptor = ArgumentCaptor.forClass(UpdateParams.class);
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture());
verify(updateJobProcessNotifier, times(2)).submitJobUpdate(updateParamsCaptor.capture(), any(ActionListener.class));
List<UpdateParams> capturedUpdateParams = updateParamsCaptor.getAllValues();
assertThat(capturedUpdateParams.size(), equalTo(2));

View File

@ -319,7 +319,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
UpdateJobAction.Request expectedJobUpdateRequest = new UpdateJobAction.Request(JOB_ID,
UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID,
new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());

View File

@ -5,7 +5,11 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.Condition;
@ -148,7 +152,7 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase {
assertThat(secondHaldRecordByFieldValues, contains("by_field_value_1", "by_field_value_2"));
}
public void testCategoricalRule() throws IOException, InterruptedException {
public void testCategoricalRule() throws Exception {
MlFilter safeIps = new MlFilter("safe_ips", Arrays.asList("111.111.111.111", "222.222.222.222"));
assertThat(putMlFilter(safeIps), is(true));
@ -211,8 +215,19 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase {
MlFilter updatedFilter = new MlFilter(safeIps.getId(), Collections.singletonList("333.333.333.333"));
assertThat(putMlFilter(updatedFilter), is(true));
// We need to give some time for the update to be applied on the autodetect process
Thread.sleep(1000);
// Wait until the notification that the process was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
).get();
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Updated filter [safe_ips] in running process"));
});
long secondAnomalyTime = timestamp;
// Send another anomalous bucket

View File

@ -5,7 +5,11 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
@ -27,6 +31,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
@ -188,7 +193,7 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
/**
* Test an open job picks up changes to scheduled events/calendars
*/
public void testOnlineUpdate() throws IOException, InterruptedException {
public void testOnlineUpdate() throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
Job.Builder job = createJob("scheduled-events-online-update", bucketSpan);
@ -216,10 +221,19 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
postScheduledEvents(calendarId, events);
// The update process action is aysnc so give it chance to update
// the job with the added scheduled events
// TODO Wait for the task to finish once #3767 is implemented
Thread.sleep(1000);
// Wait until the notification that the process was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
).get();
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Updated calendars in running process"));
});
// write some more buckets of data that cover the scheduled event period
postData(job.getId(), generateData(startTime + bucketCount * bucketSpan.millis(), bucketSpan, 5,