[ML] Add scheduled_events field to bucket (elastic/x-pack-elasticsearch#3513)

Adds `scheduled_events` to the bucket object so
it can read the corresponding field written from the
autodetect process.

Relates elastic/x-pack-elasticsearch#3016

Original commit: elastic/x-pack-elasticsearch@3f35f867f0
This commit is contained in:
Dimitris Athanasiou 2018-01-10 10:57:09 +00:00 committed by GitHub
parent 923428e19f
commit aa25704170
4 changed files with 50 additions and 2 deletions

View File

@ -224,6 +224,9 @@ public class ElasticsearchMappings {
.startObject(Bucket.PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(Bucket.SCHEDULED_EVENTS.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.PARTITION_SCORES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)

View File

@ -44,6 +44,7 @@ public class Bucket implements ToXContentObject, Writeable {
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms");
public static final ParseField PARTITION_SCORES = new ParseField("partition_scores");
public static final ParseField SCHEDULED_EVENTS = new ParseField("scheduled_events");
// Only exists for backwards compatibility; no longer added to mappings
private static final ParseField RECORD_COUNT = new ParseField("record_count");
@ -83,6 +84,7 @@ public class Bucket implements ToXContentObject, Writeable {
PARSER.declareString((bucket, s) -> {}, Result.RESULT_TYPE);
// For bwc with 5.4
PARSER.declareInt((bucket, recordCount) -> {}, RECORD_COUNT);
PARSER.declareStringArray(Bucket::setScheduledEvents, SCHEDULED_EVENTS);
}
private final String jobId;
@ -96,6 +98,7 @@ public class Bucket implements ToXContentObject, Writeable {
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to
private long processingTimeMs;
private List<PartitionScore> partitionScores = Collections.emptyList();
private List<String> scheduledEvents = Collections.emptyList();
public Bucket(String jobId, Date timestamp, long bucketSpan) {
this.jobId = jobId;
@ -115,6 +118,7 @@ public class Bucket implements ToXContentObject, Writeable {
this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers);
this.processingTimeMs = other.processingTimeMs;
this.partitionScores = new ArrayList<>(other.partitionScores);
this.scheduledEvents = new ArrayList<>(other.scheduledEvents);
}
public Bucket(StreamInput in) throws IOException {
@ -137,6 +141,14 @@ public class Bucket implements ToXContentObject, Writeable {
in.readGenericValue();
}
partitionScores = in.readList(PartitionScore::new);
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
scheduledEvents = in.readList(StreamInput::readString);
if (scheduledEvents.isEmpty()) {
scheduledEvents = Collections.emptyList();
}
} else {
scheduledEvents = Collections.emptyList();
}
}
@Override
@ -160,6 +172,9 @@ public class Bucket implements ToXContentObject, Writeable {
out.writeGenericValue(Collections.emptyMap());
}
out.writeList(partitionScores);
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeStringList(scheduledEvents);
}
}
@Override
@ -180,6 +195,9 @@ public class Bucket implements ToXContentObject, Writeable {
if (partitionScores.isEmpty() == false) {
builder.field(PARTITION_SCORES.getPreferredName(), partitionScores);
}
if (scheduledEvents.isEmpty() == false) {
builder.field(SCHEDULED_EVENTS.getPreferredName(), scheduledEvents);
}
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.endObject();
return builder;
@ -291,6 +309,14 @@ public class Bucket implements ToXContentObject, Writeable {
partitionScores = Objects.requireNonNull(scores);
}
public List<String> getScheduledEvents() {
return scheduledEvents;
}
public void setScheduledEvents(List<String> scheduledEvents) {
this.scheduledEvents = ExceptionsHelper.requireNonNull(scheduledEvents, SCHEDULED_EVENTS.getPreferredName());
}
public double partitionInitialAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();
@ -308,7 +334,7 @@ public class Bucket implements ToXContentObject, Writeable {
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, records,
isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs);
isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs, scheduledEvents);
}
/**
@ -332,7 +358,8 @@ public class Bucket implements ToXContentObject, Writeable {
&& Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim)
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers)
&& Objects.equals(this.partitionScores, that.partitionScores)
&& (this.processingTimeMs == that.processingTimeMs);
&& (this.processingTimeMs == that.processingTimeMs)
&& Objects.equals(this.scheduledEvents, that.scheduledEvents);
}
/**

View File

@ -82,6 +82,7 @@ public final class ReservedFieldNames {
Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(),
Bucket.PROCESSING_TIME_MS.getPreferredName(),
Bucket.PARTITION_SCORES.getPreferredName(),
Bucket.SCHEDULED_EVENTS.getPreferredName(),
BucketInfluencer.INITIAL_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.ANOMALY_SCORE.getPreferredName(),
BucketInfluencer.RAW_ANOMALY_SCORE.getPreferredName(), BucketInfluencer.PROBABILITY.getPreferredName(),

View File

@ -14,6 +14,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.equalTo;
public class BucketTests extends AbstractSerializingTestCase<Bucket> {
@ -71,6 +74,12 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
}
bucket.setRecords(records);
}
if (randomBoolean()) {
int size = randomInt(10);
List<String> scheduledEvents = new ArrayList<>(size);
IntStream.range(0, size).forEach(i -> scheduledEvents.add(randomAlphaOfLength(20)));
bucket.setScheduledEvents(scheduledEvents);
}
return bucket;
}
@ -275,4 +284,12 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
Bucket bucket = new Bucket("foo", new Date(123), 60L);
assertEquals("foo_bucket_123_60", bucket.getId());
}
public void testCopyConstructor() {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; ++i) {
Bucket bucket = createTestInstance();
Bucket copy = new Bucket(bucket);
assertThat(copy, equalTo(bucket));
}
}
}