diff --git a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java index 9d42efd8eb2..8e326e3556b 100644 --- a/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java @@ -5,26 +5,16 @@ */ package org.elasticsearch.xpack.core.ml.integration; -import org.apache.http.HttpStatus; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; -import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertEquals; - - public class MlRestTestStateCleaner { private final Logger logger; diff --git a/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java b/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java index 82dd2320c03..4e51c93f2ad 100644 --- a/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java +++ b/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.security.SecurityField; import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken; import org.elasticsearch.xpack.core.security.client.SecurityClient; import org.elasticsearch.xpack.security.LocalStateSecurity; + import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java index 36f578bf08f..76ad45c78f2 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java @@ -107,19 +107,4 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase { putJob(job); return job; } - - private static List generateData(long timestamp, TimeValue bucketSpan, int bucketCount, - Function timeToCountFunction) throws IOException { - List data = new ArrayList<>(); - long now = timestamp; - for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) { - for (int count = 0; count < timeToCountFunction.apply(bucketIndex); count++) { - Map record = new HashMap<>(); - record.put("time", now); - data.add(createJsonRecord(record)); - } - now += bucketSpan.getMillis(); - } - return data; - } } diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index 9304f765c25..c6e2861e1fd 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -49,7 +49,9 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; +import org.elasticsearch.xpack.core.ml.action.PutCalendarAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; @@ -58,6 +60,8 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.calendars.Calendar; +import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -87,9 +91,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; @@ -415,6 +421,16 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase { return response.isAcknowledged(); } + protected PutCalendarAction.Response putCalendar(String calendarId, List jobIds, String description) { + PutCalendarAction.Request request = new PutCalendarAction.Request(new Calendar(calendarId, jobIds, description)); + return client().execute(PutCalendarAction.INSTANCE, request).actionGet(); + } + + protected PostCalendarEventsAction.Response postScheduledEvents(String calendarId, List events) { + PostCalendarEventsAction.Request request = new PostCalendarEventsAction.Request(calendarId, events); + return client().execute(PostCalendarEventsAction.INSTANCE, request).actionGet(); + } + @Override protected void ensureClusterStateConsistency() throws IOException { if (cluster() != null && cluster().size() > 0) { @@ -469,6 +485,21 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase { } } + protected List generateData(long timestamp, TimeValue bucketSpan, int bucketCount, + Function timeToCountFunction) throws IOException { + List data = new ArrayList<>(); + long now = timestamp; + for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) { + for (int count = 0; count < timeToCountFunction.apply(bucketIndex); count++) { + Map record = new HashMap<>(); + record.put("time", now); + data.add(createJsonRecord(record)); + } + now += bucketSpan.getMillis(); + } + return data; + } + protected static String createJsonRecord(Map keyValueMap) throws IOException { return JsonXContent.contentBuilder().map(keyValueMap).string() + "\n"; } diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java new file mode 100644 index 00000000000..8f005fe0e13 --- /dev/null +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java @@ -0,0 +1,260 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; +import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase { + public void testScheduledEvents() throws IOException { + + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + Job.Builder job = createJob("scheduled-events", bucketSpan); + String calendarId = "test-calendar"; + putCalendar(calendarId, Collections.singletonList(job.getId()), "testScheduledEvents calendar"); + + long startTime = 1514764800000L; + + List events = new ArrayList<>(); + long firstEventStartTime = 1514937600000L; + long firstEventEndTime = firstEventStartTime + 2 * 60 * 60 * 1000; + events.add(new ScheduledEvent.Builder().description("1st event (2hr)") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(firstEventStartTime), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(firstEventEndTime), ZoneOffset.UTC)) + .calendarId(calendarId).build()); + // add 10 min event smaller than the bucket + long secondEventStartTime = 1515067200000L; + long secondEventEndTime = secondEventStartTime + 10 * 60 * 1000; + events.add(new ScheduledEvent.Builder().description("2nd event with period smaller than bucketspan") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(secondEventStartTime), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(secondEventEndTime), ZoneOffset.UTC)) + .calendarId(calendarId).build()); + long thirdEventStartTime = 1515088800000L; + long thirdEventEndTime = thirdEventStartTime + 3 * 60 * 60 * 1000; + events.add(new ScheduledEvent.Builder().description("3rd event 3hr") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(thirdEventStartTime), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(thirdEventEndTime), ZoneOffset.UTC)) + .calendarId(calendarId).build()); + + postScheduledEvents(calendarId, events); + + // Run 6 days of data + runJob(job, startTime, bucketSpan, 2 * 24 * 6); + + // Check tags on the buckets during the first event + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId()); + getBucketsRequest.setStart(Long.toString(firstEventStartTime)); + getBucketsRequest.setEnd(Long.toString(firstEventEndTime)); + List buckets = getBuckets(getBucketsRequest); + for (Bucket bucket : buckets) { + assertEquals(1, bucket.getScheduledEvents().size()); + assertEquals("1st event (2hr)", bucket.getScheduledEvents().get(0)); + assertEquals(0.0, bucket.getAnomalyScore(), 0.00001); + } + + // Following buckets have 0 events + getBucketsRequest = new GetBucketsAction.Request(job.getId()); + getBucketsRequest.setStart(Long.toString(firstEventEndTime)); + getBucketsRequest.setEnd(Long.toString(secondEventStartTime)); + buckets = getBuckets(getBucketsRequest); + for (Bucket bucket : buckets) { + assertEquals(0, bucket.getScheduledEvents().size()); + } + + // The second event bucket + getBucketsRequest.setStart(Long.toString(secondEventStartTime)); + getBucketsRequest.setEnd(Long.toString(secondEventEndTime)); + buckets = getBuckets(getBucketsRequest); + assertEquals(1, buckets.size()); + for (Bucket bucket : buckets) { + assertEquals(1, bucket.getScheduledEvents().size()); + assertEquals("2nd event with period smaller than bucketspan", bucket.getScheduledEvents().get(0)); + assertEquals(0.0, bucket.getAnomalyScore(), 0.00001); + } + + // Following buckets have 0 events + getBucketsRequest.setStart(Long.toString(secondEventEndTime)); + getBucketsRequest.setEnd(Long.toString(thirdEventStartTime)); + buckets = getBuckets(getBucketsRequest); + for (Bucket bucket : buckets) { + assertEquals(0, bucket.getScheduledEvents().size()); + } + + // The 3rd event buckets + getBucketsRequest.setStart(Long.toString(thirdEventStartTime)); + getBucketsRequest.setEnd(Long.toString(thirdEventEndTime)); + buckets = getBuckets(getBucketsRequest); + for (Bucket bucket : buckets) { + assertEquals(1, bucket.getScheduledEvents().size()); + assertEquals("3rd event 3hr", bucket.getScheduledEvents().get(0)); + assertEquals(0.0, bucket.getAnomalyScore(), 0.00001); + } + + // Following buckets have 0 events + getBucketsRequest.setStart(Long.toString(thirdEventEndTime)); + getBucketsRequest.setEnd(null); + buckets = getBuckets(getBucketsRequest); + for (Bucket bucket : buckets) { + assertEquals(0, bucket.getScheduledEvents().size()); + } + + // It is unlikely any anomaly records have been created but + // ensure there are non present anyway + GetRecordsAction.Request getRecordsRequest = new GetRecordsAction.Request(job.getId()); + getRecordsRequest.setStart(Long.toString(firstEventStartTime)); + getRecordsRequest.setEnd(Long.toString(firstEventEndTime)); + List records = getRecords(getRecordsRequest); + assertThat(records, is(empty())); + + getRecordsRequest.setStart(Long.toString(secondEventStartTime)); + getRecordsRequest.setEnd(Long.toString(secondEventEndTime)); + records = getRecords(getRecordsRequest); + assertThat(records, is(empty())); + + getRecordsRequest.setStart(Long.toString(thirdEventStartTime)); + getRecordsRequest.setEnd(Long.toString(thirdEventEndTime)); + records = getRecords(getRecordsRequest); + assertThat(records, is(empty())); + } + + public void testScheduledEventWithInterimResults() throws IOException { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + Job.Builder job = createJob("scheduled-events-interim-results", bucketSpan); + String calendarId = "test-calendar"; + putCalendar(calendarId, Collections.singletonList(job.getId()), "testScheduledEventWithInterimResults calendar"); + + long startTime = 1514764800000L; + + List events = new ArrayList<>(); + // The event starts 10 buckets in and lasts for 2 + int bucketCount = 10; + long firstEventStartTime = startTime + bucketSpan.millis() * bucketCount; + long firstEventEndTime = firstEventStartTime + bucketSpan.millis() * 2; + events.add(new ScheduledEvent.Builder().description("1st event 2hr") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(firstEventStartTime), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(firstEventEndTime), ZoneOffset.UTC)) + .calendarId(calendarId).build()); + postScheduledEvents(calendarId, events); + + + openJob(job.getId()); + // write data up to and including the event + postData(job.getId(), generateData(startTime, bucketSpan, bucketCount + 1, bucketIndex -> randomIntBetween(100, 200)) + .stream().collect(Collectors.joining())); + + // flush the job and get the interim result during the event + flushJob(job.getId(), true); + + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId()); + getBucketsRequest.setStart(Long.toString(firstEventStartTime)); + List buckets = getBuckets(getBucketsRequest); + assertEquals(1, buckets.size()); + assertTrue(buckets.get(0).isInterim()); + assertEquals(1, buckets.get(0).getScheduledEvents().size()); + assertEquals("1st event 2hr", buckets.get(0).getScheduledEvents().get(0)); + assertEquals(0.0, buckets.get(0).getAnomalyScore(), 0.00001); + } + + /** + * Test an open job picks up changes to scheduled events/calendars + */ + public void testOnlineUpdate() throws IOException, InterruptedException { + TimeValue bucketSpan = TimeValue.timeValueMinutes(30); + Job.Builder job = createJob("scheduled-events-online-update", bucketSpan); + + long startTime = 1514764800000L; + final int bucketCount = 5; + + // Open the job + openJob(job.getId()); + + // write some buckets of data + postData(job.getId(), generateData(startTime, bucketSpan, bucketCount, bucketIndex -> randomIntBetween(100, 200)) + .stream().collect(Collectors.joining())); + + // Now create a calendar and events for the job while it is open + String calendarId = "test-calendar-online-update"; + putCalendar(calendarId, Collections.singletonList(job.getId()), "testOnlineUpdate calendar"); + + List events = new ArrayList<>(); + long eventStartTime = startTime + (bucketCount + 1) * bucketSpan.millis(); + long eventEndTime = eventStartTime + (long)(1.5 * bucketSpan.millis()); + events.add(new ScheduledEvent.Builder().description("Some Event") + .startTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(eventStartTime), ZoneOffset.UTC)) + .endTime(ZonedDateTime.ofInstant(Instant.ofEpochMilli(eventEndTime), ZoneOffset.UTC)) + .calendarId(calendarId).build()); + + postScheduledEvents(calendarId, events); + + // The update process action is aysnc so give it chance to update + // the job with the added scheduled events + // TODO Wait for the task to finish once #3767 is implemented + Thread.sleep(1000); + + // write some more buckets of data that cover the scheduled event period + postData(job.getId(), generateData(startTime + bucketCount * bucketSpan.millis(), bucketSpan, 5, + bucketIndex -> randomIntBetween(100, 200)) + .stream().collect(Collectors.joining())); + // and close + closeJob(job.getId()); + + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId()); + List buckets = getBuckets(getBucketsRequest); + + // the first buckets have no events + for (int i=0; i<=bucketCount; i++) { + assertEquals(0, buckets.get(i).getScheduledEvents().size()); + } + // 7th and 8th buckets have the event + assertEquals(1, buckets.get(6).getScheduledEvents().size()); + assertEquals("Some Event", buckets.get(6).getScheduledEvents().get(0)); + assertEquals(1, buckets.get(7).getScheduledEvents().size()); + assertEquals("Some Event", buckets.get(7).getScheduledEvents().get(0)); + assertEquals(0, buckets.get(8).getScheduledEvents().size()); + } + + private Job.Builder createJob(String jobId, TimeValue bucketSpan) { + Detector.Builder detector = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(bucketSpan); + Job.Builder job = new Job.Builder(jobId); + job.setAnalysisConfig(analysisConfig); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + job.setDataDescription(dataDescription); + putJob(job); + // register for clean up + registerJob(job); + + return job; + } + + private void runJob(Job.Builder job, long startTime, TimeValue bucketSpan, int bucketCount) throws IOException { + openJob(job.getId()); + postData(job.getId(), generateData(startTime, bucketSpan, bucketCount, bucketIndex -> randomIntBetween(100, 200)) + .stream().collect(Collectors.joining())); + closeJob(job.getId()); + } +}