[ML] fix updating opened jobs scheduled events (#31651) (#32881)

* ML: fix updating opened jobs scheduled events (#31651)

* Adding UpdateParamsTests license header

* Adding integration test and addressing PR comments

* addressing test and job names
This commit is contained in:
Benjamin Trent 2018-08-17 07:21:17 -05:00 committed by GitHub
parent a08127c072
commit 9cec4aa14b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 4 deletions

View File

@ -258,7 +258,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
} }
public boolean isAutodetectProcessUpdate() { public boolean isAutodetectProcessUpdate() {
return modelPlotConfig != null || detectorUpdates != null; return modelPlotConfig != null || detectorUpdates != null || groups != null;
} }
@Override @Override

View File

@ -274,6 +274,8 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
assertTrue(update.isAutodetectProcessUpdate()); assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate.Builder("foo").setDetectorUpdates(Collections.singletonList(mock(JobUpdate.DetectorUpdate.class))).build(); update = new JobUpdate.Builder("foo").setDetectorUpdates(Collections.singletonList(mock(JobUpdate.DetectorUpdate.class))).build();
assertTrue(update.isAutodetectProcessUpdate()); assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate.Builder("foo").setGroups(Arrays.asList("bar")).build();
assertTrue(update.isAutodetectProcessUpdate());
} }
public void testUpdateAnalysisLimitWithValueGreaterThanMax() { public void testUpdateAnalysisLimitWithValueGreaterThanMax() {

View File

@ -66,6 +66,7 @@ public final class UpdateParams {
return new Builder(jobUpdate.getJobId()) return new Builder(jobUpdate.getJobId())
.modelPlotConfig(jobUpdate.getModelPlotConfig()) .modelPlotConfig(jobUpdate.getModelPlotConfig())
.detectorUpdates(jobUpdate.getDetectorUpdates()) .detectorUpdates(jobUpdate.getDetectorUpdates())
.updateScheduledEvents(jobUpdate.getGroups() != null)
.build(); .build();
} }

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class UpdateParamsTests extends ESTestCase {
public void testFromJobUpdate() {
String jobId = "foo";
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(
new RuleCondition(RuleCondition.AppliesTo.ACTUAL,
Operator.GT, 1.0))).build();
List<DetectionRule> rules = Arrays.asList(rule);
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(
new JobUpdate.DetectorUpdate(2, null, rules));
JobUpdate.Builder updateBuilder = new JobUpdate.Builder(jobId)
.setModelPlotConfig(new ModelPlotConfig())
.setDetectorUpdates(detectorUpdates);
UpdateParams params = UpdateParams.fromJobUpdate(updateBuilder.build());
assertFalse(params.isUpdateScheduledEvents());
assertEquals(params.getDetectorUpdates(), updateBuilder.build().getDetectorUpdates());
assertEquals(params.getModelPlotConfig(), updateBuilder.build().getModelPlotConfig());
params = UpdateParams.fromJobUpdate(updateBuilder.setGroups(Arrays.asList("bar")).build());
assertTrue(params.isUpdateScheduledEvents());
}
}

View File

@ -12,11 +12,13 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.junit.After; import org.junit.After;
@ -193,9 +195,9 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
/** /**
* Test an open job picks up changes to scheduled events/calendars * Test an open job picks up changes to scheduled events/calendars
*/ */
public void testOnlineUpdate() throws Exception { public void testAddEventsToOpenJob() throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30); TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
Job.Builder job = createJob("scheduled-events-online-update", bucketSpan); Job.Builder job = createJob("scheduled-events-add-events-to-open-job", bucketSpan);
long startTime = 1514764800000L; long startTime = 1514764800000L;
final int bucketCount = 5; final int bucketCount = 5;
@ -209,7 +211,7 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
// Now create a calendar and events for the job while it is open // Now create a calendar and events for the job while it is open
String calendarId = "test-calendar-online-update"; String calendarId = "test-calendar-online-update";
putCalendar(calendarId, Collections.singletonList(job.getId()), "testOnlineUpdate calendar"); putCalendar(calendarId, Collections.singletonList(job.getId()), "testAddEventsToOpenJob calendar");
List<ScheduledEvent> events = new ArrayList<>(); List<ScheduledEvent> events = new ArrayList<>();
long eventStartTime = startTime + (bucketCount + 1) * bucketSpan.millis(); long eventStartTime = startTime + (bucketCount + 1) * bucketSpan.millis();
@ -257,6 +259,81 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
assertEquals(0, buckets.get(8).getScheduledEvents().size()); assertEquals(0, buckets.get(8).getScheduledEvents().size());
} }
/**
* An open job that later gets added to a calendar, should take the scheduled events into account
*/
public void testAddOpenedJobToGroupWithCalendar() throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
String groupName = "opened-calendar-job-group";
Job.Builder job = createJob("scheduled-events-add-opened-job-to-group-with-calendar", bucketSpan);
long startTime = 1514764800000L;
final int bucketCount = 5;
// Open the job
openJob(job.getId());
// write some buckets of data
postData(job.getId(), generateData(startTime, bucketSpan, bucketCount, bucketIndex -> randomIntBetween(100, 200))
.stream().collect(Collectors.joining()));
String calendarId = "test-calendar-open-job-update";
// Create a new calendar referencing groupName
putCalendar(calendarId, Collections.singletonList(groupName), "testAddOpenedJobToGroupWithCalendar calendar");
// Put events in the calendar
List<ScheduledEvent> events = new ArrayList<>();
long eventStartTime = startTime + (bucketCount + 1) * bucketSpan.millis();
long eventEndTime = eventStartTime + (long)(1.5 * bucketSpan.millis());
events.add(new ScheduledEvent.Builder().description("Some Event")
.startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(eventStartTime), ZoneOffset.UTC))
.endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(eventEndTime), ZoneOffset.UTC))
.calendarId(calendarId).build());
postScheduledEvents(calendarId, events);
// Update the job to be a member of the group
UpdateJobAction.Request jobUpdateRequest = new UpdateJobAction.Request(job.getId(),
new JobUpdate.Builder(job.getId()).setGroups(Collections.singletonList(groupName)).build());
client().execute(UpdateJobAction.INSTANCE, jobUpdateRequest).actionGet();
// Wait until the notification that the job was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
).get();
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Job updated: [groups]"));
});
// write some more buckets of data that cover the scheduled event period
postData(job.getId(), generateData(startTime + bucketCount * bucketSpan.millis(), bucketSpan, 5,
bucketIndex -> randomIntBetween(100, 200))
.stream().collect(Collectors.joining()));
// and close
closeJob(job.getId());
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId());
List<Bucket> buckets = getBuckets(getBucketsRequest);
// the first 6 buckets have no events
for (int i=0; i<=bucketCount; i++) {
assertEquals(0, buckets.get(i).getScheduledEvents().size());
}
// 7th and 8th buckets have the event but the last one does not
assertEquals(1, buckets.get(6).getScheduledEvents().size());
assertEquals("Some Event", buckets.get(6).getScheduledEvents().get(0));
assertEquals(1, buckets.get(7).getScheduledEvents().size());
assertEquals("Some Event", buckets.get(7).getScheduledEvents().get(0));
assertEquals(0, buckets.get(8).getScheduledEvents().size());
}
private Job.Builder createJob(String jobId, TimeValue bucketSpan) { private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
Detector.Builder detector = new Detector.Builder("count", null); Detector.Builder detector = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));