diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java index d0a1e942965..ce129a38b72 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.prelert.job.results.Result; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -68,7 +69,7 @@ public class JobResultsPersister extends AbstractComponent { private final String jobId; private final String indexName; - private Builder (String jobId) { + private Builder(String jobId) { this.jobId = Objects.requireNonNull(jobId); indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); bulkRequest = client.prepareBulk(); @@ -82,22 +83,30 @@ public class JobResultsPersister extends AbstractComponent { * @return this */ public Builder persistBucket(Bucket bucket) { + // If the supplied bucket has records then create a copy with records + // removed, because we never persist nested records in buckets + Bucket bucketWithoutRecords = bucket; + if (!bucketWithoutRecords.getRecords().isEmpty()) { + bucketWithoutRecords = new Bucket(bucket); + bucketWithoutRecords.setRecords(Collections.emptyList()); + } try { - XContentBuilder content = toXContentBuilder(bucket); + XContentBuilder content = toXContentBuilder(bucketWithoutRecords); logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", - jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucket.getEpoch()); + jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch()); - bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId()).setSource(content)); + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), + bucketWithoutRecords.getId()).setSource(content)); - persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers()); + persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers()); } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}, e)); + logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}), e); } return this; } - private void persistBucketInfluencersStandalone(String jobId, String bucketId, List bucketInfluencers) + private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) throws IOException { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) { @@ -128,7 +137,7 @@ public class JobResultsPersister extends AbstractComponent { client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content)); } } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}, e)); + logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e); } return this; @@ -151,7 +160,7 @@ public class JobResultsPersister extends AbstractComponent { client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content)); } } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}, e)); + logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e); } return this; @@ -173,7 +182,7 @@ public class JobResultsPersister extends AbstractComponent { .setSource(builder)); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores", - new Object[]{jobId}, e)); + new Object[]{jobId}), e); } return this; @@ -294,7 +303,6 @@ public class JobResultsPersister extends AbstractComponent { return true; } - XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { XContentBuilder builder = jsonBuilder(); obj.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -336,7 +344,7 @@ public class JobResultsPersister extends AbstractComponent { .execute().actionGet(); return true; } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}, e)); + logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e); return false; } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java index 961c801a352..55fae93a438 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.prelert.job.results; import org.elasticsearch.action.support.ToXContentToBytes; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.io.stream.StreamInput; @@ -36,7 +35,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { /* * Field Names */ - public static final ParseField JOB_ID = Job.ID; + private static final ParseField JOB_ID = Job.ID; public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField ANOMALY_SCORE = new ParseField("anomaly_score"); @@ -98,7 +97,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { private long eventCount; private boolean isInterim; private boolean hadBigNormalizedUpdate; - private List bucketInfluencers = new ArrayList<>(); + private List bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to private long processingTimeMs; private Map perPartitionMaxProbability = Collections.emptyMap(); private List partitionScores = Collections.emptyList(); @@ -109,6 +108,24 @@ public class Bucket extends ToXContentToBytes implements Writeable { this.bucketSpan = bucketSpan; } + public Bucket(Bucket other) { + this.jobId = other.jobId; + this.timestamp = other.timestamp; + this.bucketSpan = other.bucketSpan; + this.anomalyScore = other.anomalyScore; + this.initialAnomalyScore = other.initialAnomalyScore; + this.maxNormalizedProbability = other.maxNormalizedProbability; + this.recordCount = other.recordCount; + this.records = new ArrayList<>(other.records); + this.eventCount = other.eventCount; + this.isInterim = other.isInterim; + this.hadBigNormalizedUpdate = other.hadBigNormalizedUpdate; + this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers); + this.processingTimeMs = other.processingTimeMs; + this.perPartitionMaxProbability = other.perPartitionMaxProbability; + this.partitionScores = new ArrayList<>(other.partitionScores); + } + @SuppressWarnings("unchecked") public Bucket(StreamInput in) throws IOException { jobId = in.readString(); @@ -155,7 +172,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore); builder.field(MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalizedProbability); builder.field(RECORD_COUNT.getPreferredName(), recordCount); - if (records != null && !records.isEmpty()) { + if (!records.isEmpty()) { builder.field(RECORDS.getPreferredName(), records); } builder.field(EVENT_COUNT.getPreferredName(), eventCount); @@ -168,7 +185,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { return builder; } - public String getJobId() { return jobId; } @@ -208,8 +224,8 @@ public class Bucket extends ToXContentToBytes implements Writeable { return initialAnomalyScore; } - public void setInitialAnomalyScore(double influenceScore) { - this.initialAnomalyScore = influenceScore; + public void setInitialAnomalyScore(double initialAnomalyScore) { + this.initialAnomalyScore = initialAnomalyScore; } public double getMaxNormalizedProbability() { @@ -234,16 +250,14 @@ public class Bucket extends ToXContentToBytes implements Writeable { * only be present when the bucket was retrieved and expanded * to contain the associated records. * - * @return null or the anomaly records for the bucket - * if the bucket was expanded. + * @return the anomaly records for the bucket IF the bucket was expanded. */ - @Nullable public List getRecords() { return records; } public void setRecords(List records) { - this.records = records; + this.records = Objects.requireNonNull(records); } /** @@ -278,13 +292,10 @@ public class Bucket extends ToXContentToBytes implements Writeable { } public void setBucketInfluencers(List bucketInfluencers) { - this.bucketInfluencers = bucketInfluencers; + this.bucketInfluencers = Objects.requireNonNull(bucketInfluencers); } public void addBucketInfluencer(BucketInfluencer bucketInfluencer) { - if (bucketInfluencers == null) { - bucketInfluencers = new ArrayList<>(); - } bucketInfluencers.add(bucketInfluencer); } @@ -293,7 +304,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { } public void setPartitionScores(List scores) { - partitionScores = scores; + partitionScores = Objects.requireNonNull(scores); } public Map getPerPartitionMaxProbability() { @@ -301,7 +312,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { } public void setPerPartitionMaxProbability(Map perPartitionMaxProbability) { - this.perPartitionMaxProbability = perPartitionMaxProbability; + this.perPartitionMaxProbability = Objects.requireNonNull(perPartitionMaxProbability); } public double partitionInitialAnomalyScore(String partitionValue) { @@ -321,7 +332,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { @Override public int hashCode() { // hadBigNormalizedUpdate is deliberately excluded from the hash - // as is id, which is generated by the datastore return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records, isInterim, bucketSpan, bucketInfluencers); } @@ -342,7 +352,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { Bucket that = (Bucket) other; // hadBigNormalizedUpdate is deliberately excluded from the test - // as is id, which is generated by the datastore return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp) && (this.eventCount == that.eventCount) && (this.bucketSpan == that.bucketSpan) && (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore) @@ -380,7 +389,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { * @return true if the bucket should be normalized or false otherwise */ public boolean isNormalizable() { - if (bucketInfluencers == null || bucketInfluencers.isEmpty()) { + if (bucketInfluencers.isEmpty()) { return false; } return anomalyScore > 0.0 || recordCount > 0; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java index 14328b175da..01ef8bb4973 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java @@ -115,6 +115,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); + // Records are not persisted to Elasticsearch as an array within the bucket + // documents, so remove them from the expected bucket before comparing + bucket.setRecords(Collections.emptyList()); assertEquals(bucket, persistedBucket.results().get(0)); QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); @@ -179,6 +182,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); + // Records are not persisted to Elasticsearch as an array within the bucket + // documents, so remove them from the expected bucket before comparing + nonInterimBucket.setRecords(Collections.emptyList()); assertEquals(nonInterimBucket, persistedBucket.results().get(0)); QueryPage persistedInfluencers = jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build()); @@ -225,6 +231,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); + // Records are not persisted to Elasticsearch as an array within the bucket + // documents, so remove them from the expected bucket before comparing + finalBucket.setRecords(Collections.emptyList()); assertEquals(finalBucket, persistedBucket.results().get(0)); QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java index 7f3c7af7252..1a53bdf9cd7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java @@ -76,6 +76,8 @@ public class JobResultsPersisterTests extends ESTestCase { assertTrue(s.matches(".*event_count.:57.*")); assertTrue(s.matches(".*bucket_span.:123456.*")); assertTrue(s.matches(".*processing_time_ms.:8888.*")); + // There should NOT be any nested records + assertFalse(s.matches(".*records*")); s = list.get(1).string(); assertTrue(s.matches(".*probability.:0\\.0054.*")); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java index 0fbf10b94d3..2ca4f8183d8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java @@ -95,7 +95,7 @@ public class ScoresUpdaterTests extends ESTestCase { public void testUpdate_GivenBucketWithNonZeroScoreButNoBucketInfluencers() throws IOException { Bucket bucket = generateBucket(new Date(0)); bucket.setAnomalyScore(0.0); - bucket.setBucketInfluencers(null); + bucket.setBucketInfluencers(new ArrayList<>()); Deque buckets = new ArrayDeque<>(); buckets.add(bucket); givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java index 3f474e10248..98e725f9aa5 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java @@ -165,7 +165,7 @@ public class BucketTests extends AbstractSerializingTestCase { Bucket bucket1 = new Bucket("foo", new Date(123), 123); bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1))); Bucket bucket2 = new Bucket("foo", new Date(123), 123); - bucket2.setRecords(null); + bucket2.setRecords(Collections.emptyList()); assertFalse(bucket1.equals(bucket2)); assertFalse(bucket2.equals(bucket1)); @@ -252,14 +252,6 @@ public class BucketTests extends AbstractSerializingTestCase { assertEquals(bucket1.hashCode(), bucket2.hashCode()); } - public void testIsNormalizable_GivenNullBucketInfluencers() { - Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.setBucketInfluencers(null); - bucket.setAnomalyScore(90.0); - - assertFalse(bucket.isNormalizable()); - } - public void testIsNormalizable_GivenEmptyBucketInfluencers() { Bucket bucket = new Bucket("foo", new Date(123), 123); bucket.setBucketInfluencers(Collections.emptyList());