From aa25704170715018761e7cef6fbe050b1c4d2ef3 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 10 Jan 2018 10:57:09 +0000 Subject: [PATCH] [ML] Add scheduled_events field to bucket (elastic/x-pack-elasticsearch#3513) Adds `scheduled_events` to the bucket object so it can read the corresponding field written from the autodetect process. Relates elastic/x-pack-elasticsearch#3016 Original commit: elastic/x-pack-elasticsearch@3f35f867f055b78977e6b796107b76140175044b --- .../persistence/ElasticsearchMappings.java | 3 ++ .../xpack/ml/job/results/Bucket.java | 31 +++++++++++++++++-- .../ml/job/results/ReservedFieldNames.java | 1 + .../xpack/ml/job/results/BucketTests.java | 17 ++++++++++ 4 files changed, 50 insertions(+), 2 deletions(-) diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 6138e5a3160..1313286107e 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -224,6 +224,9 @@ public class ElasticsearchMappings { .startObject(Bucket.PROCESSING_TIME_MS.getPreferredName()) .field(TYPE, LONG) .endObject() + .startObject(Bucket.SCHEDULED_EVENTS.getPreferredName()) + .field(TYPE, KEYWORD) + .endObject() .startObject(Bucket.PARTITION_SCORES.getPreferredName()) .field(TYPE, NESTED) .startObject(PROPERTIES) diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java index 71ae54f0ae5..cb796497f47 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/results/Bucket.java @@ -44,6 +44,7 @@ public class Bucket implements ToXContentObject, Writeable { public static final ParseField BUCKET_SPAN = new ParseField("bucket_span"); public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms"); public static final ParseField PARTITION_SCORES = new ParseField("partition_scores"); + public static final ParseField SCHEDULED_EVENTS = new ParseField("scheduled_events"); // Only exists for backwards compatibility; no longer added to mappings private static final ParseField RECORD_COUNT = new ParseField("record_count"); @@ -83,6 +84,7 @@ public class Bucket implements ToXContentObject, Writeable { PARSER.declareString((bucket, s) -> {}, Result.RESULT_TYPE); // For bwc with 5.4 PARSER.declareInt((bucket, recordCount) -> {}, RECORD_COUNT); + PARSER.declareStringArray(Bucket::setScheduledEvents, SCHEDULED_EVENTS); } private final String jobId; @@ -96,6 +98,7 @@ public class Bucket implements ToXContentObject, Writeable { private List bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to private long processingTimeMs; private List partitionScores = Collections.emptyList(); + private List scheduledEvents = Collections.emptyList(); public Bucket(String jobId, Date timestamp, long bucketSpan) { this.jobId = jobId; @@ -115,6 +118,7 @@ public class Bucket implements ToXContentObject, Writeable { this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers); this.processingTimeMs = other.processingTimeMs; this.partitionScores = new ArrayList<>(other.partitionScores); + this.scheduledEvents = new ArrayList<>(other.scheduledEvents); } public Bucket(StreamInput in) throws IOException { @@ -137,6 +141,14 @@ public class Bucket implements ToXContentObject, Writeable { in.readGenericValue(); } partitionScores = in.readList(PartitionScore::new); + if (in.getVersion().onOrAfter(Version.V_6_2_0)) { + scheduledEvents = in.readList(StreamInput::readString); + if (scheduledEvents.isEmpty()) { + scheduledEvents = Collections.emptyList(); + } + } else { + scheduledEvents = Collections.emptyList(); + } } @Override @@ -160,6 +172,9 @@ public class Bucket implements ToXContentObject, Writeable { out.writeGenericValue(Collections.emptyMap()); } out.writeList(partitionScores); + if (out.getVersion().onOrAfter(Version.V_6_2_0)) { + out.writeStringList(scheduledEvents); + } } @Override @@ -180,6 +195,9 @@ public class Bucket implements ToXContentObject, Writeable { if (partitionScores.isEmpty() == false) { builder.field(PARTITION_SCORES.getPreferredName(), partitionScores); } + if (scheduledEvents.isEmpty() == false) { + builder.field(SCHEDULED_EVENTS.getPreferredName(), scheduledEvents); + } builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); builder.endObject(); return builder; @@ -291,6 +309,14 @@ public class Bucket implements ToXContentObject, Writeable { partitionScores = Objects.requireNonNull(scores); } + public List getScheduledEvents() { + return scheduledEvents; + } + + public void setScheduledEvents(List scheduledEvents) { + this.scheduledEvents = ExceptionsHelper.requireNonNull(scheduledEvents, SCHEDULED_EVENTS.getPreferredName()); + } + public double partitionInitialAnomalyScore(String partitionValue) { Optional first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue())) .findFirst(); @@ -308,7 +334,7 @@ public class Bucket implements ToXContentObject, Writeable { @Override public int hashCode() { return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, records, - isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs); + isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs, scheduledEvents); } /** @@ -332,7 +358,8 @@ public class Bucket implements ToXContentObject, Writeable { && Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim) && Objects.equals(this.bucketInfluencers, that.bucketInfluencers) && Objects.equals(this.partitionScores, that.partitionScores) - && (this.processingTimeMs == that.processingTimeMs); + && (this.processingTimeMs == that.processingTimeMs) + && Objects.equals(this.scheduledEvents, that.scheduledEvents); } /** diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index f84f1c9d653..5fd587c0c54 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -82,6 +82,7 @@ public final class ReservedFieldNames { Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(), Bucket.PROCESSING_TIME_MS.getPreferredName(), Bucket.PARTITION_SCORES.getPreferredName(), + Bucket.SCHEDULED_EVENTS.getPreferredName(), BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.ANOMALY_SCORE.getPreferredName(), BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.PROBABILITY.getPreferredName(), diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java index 72c4d2935c4..039f91809a6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java @@ -14,6 +14,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; public class BucketTests extends AbstractSerializingTestCase { @@ -71,6 +74,12 @@ public class BucketTests extends AbstractSerializingTestCase { } bucket.setRecords(records); } + if (randomBoolean()) { + int size = randomInt(10); + List scheduledEvents = new ArrayList<>(size); + IntStream.range(0, size).forEach(i -> scheduledEvents.add(randomAlphaOfLength(20))); + bucket.setScheduledEvents(scheduledEvents); + } return bucket; } @@ -275,4 +284,12 @@ public class BucketTests extends AbstractSerializingTestCase { Bucket bucket = new Bucket("foo", new Date(123), 60L); assertEquals("foo_bucket_123_60", bucket.getId()); } + + public void testCopyConstructor() { + for (int i = 0; i < NUMBER_OF_TEST_RUNS; ++i) { + Bucket bucket = createTestInstance(); + Bucket copy = new Bucket(bucket); + assertThat(copy, equalTo(bucket)); + } + } }