From 4033c93a137502cf3ed5f98a745c840fb81c1412 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 21 Dec 2016 10:42:39 +0000 Subject: [PATCH] Never persist records inside bucket objects (elastic/elasticsearch#588) This doesn't happen initially when buckets are output by the C++, but buckets can contain records at the moment they're sent for persistence during normalization or during integration tests. It's safest if the persistence code specifically doesn't persist these records. Original commit: elastic/x-pack-elasticsearch@a93135d8c042c12a9bac13dfe298b40dff0ea12b --- .../job/persistence/JobResultsPersister.java | 32 +++++++----- .../xpack/prelert/job/results/Bucket.java | 49 +++++++++++-------- .../AutodetectResultProcessorIT.java | 9 ++++ .../persistence/JobResultsPersisterTests.java | 2 + .../normalizer/ScoresUpdaterTests.java | 2 +- .../prelert/job/results/BucketTests.java | 10 +--- 6 files changed, 62 insertions(+), 42 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java index d0a1e942965..ce129a38b72 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities; import org.elasticsearch.xpack.prelert.job.results.Result; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -68,7 +69,7 @@ public class JobResultsPersister extends AbstractComponent { private final String jobId; private final String indexName; - private Builder (String jobId) { + private Builder(String jobId) { this.jobId = Objects.requireNonNull(jobId); indexName = AnomalyDetectorsIndex.getJobIndexName(jobId); bulkRequest = client.prepareBulk(); @@ -82,22 +83,30 @@ public class JobResultsPersister extends AbstractComponent { * @return this */ public Builder persistBucket(Bucket bucket) { + // If the supplied bucket has records then create a copy with records + // removed, because we never persist nested records in buckets + Bucket bucketWithoutRecords = bucket; + if (!bucketWithoutRecords.getRecords().isEmpty()) { + bucketWithoutRecords = new Bucket(bucket); + bucketWithoutRecords.setRecords(Collections.emptyList()); + } try { - XContentBuilder content = toXContentBuilder(bucket); + XContentBuilder content = toXContentBuilder(bucketWithoutRecords); logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", - jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucket.getEpoch()); + jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch()); - bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId()).setSource(content)); + bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), + bucketWithoutRecords.getId()).setSource(content)); - persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers()); + persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers()); } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}, e)); + logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}), e); } return this; } - private void persistBucketInfluencersStandalone(String jobId, String bucketId, List bucketInfluencers) + private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) throws IOException { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) { @@ -128,7 +137,7 @@ public class JobResultsPersister extends AbstractComponent { client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content)); } } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}, e)); + logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e); } return this; @@ -151,7 +160,7 @@ public class JobResultsPersister extends AbstractComponent { client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content)); } } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}, e)); + logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e); } return this; @@ -173,7 +182,7 @@ public class JobResultsPersister extends AbstractComponent { .setSource(builder)); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores", - new Object[]{jobId}, e)); + new Object[]{jobId}), e); } return this; @@ -294,7 +303,6 @@ public class JobResultsPersister extends AbstractComponent { return true; } - XContentBuilder toXContentBuilder(ToXContent obj) throws IOException { XContentBuilder builder = jsonBuilder(); obj.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -336,7 +344,7 @@ public class JobResultsPersister extends AbstractComponent { .execute().actionGet(); return true; } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}, e)); + logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e); return false; } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java index 961c801a352..55fae93a438 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/results/Bucket.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.prelert.job.results; import org.elasticsearch.action.support.ToXContentToBytes; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.io.stream.StreamInput; @@ -36,7 +35,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { /* * Field Names */ - public static final ParseField JOB_ID = Job.ID; + private static final ParseField JOB_ID = Job.ID; public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField ANOMALY_SCORE = new ParseField("anomaly_score"); @@ -98,7 +97,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { private long eventCount; private boolean isInterim; private boolean hadBigNormalizedUpdate; - private List bucketInfluencers = new ArrayList<>(); + private List bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to private long processingTimeMs; private Map perPartitionMaxProbability = Collections.emptyMap(); private List partitionScores = Collections.emptyList(); @@ -109,6 +108,24 @@ public class Bucket extends ToXContentToBytes implements Writeable { this.bucketSpan = bucketSpan; } + public Bucket(Bucket other) { + this.jobId = other.jobId; + this.timestamp = other.timestamp; + this.bucketSpan = other.bucketSpan; + this.anomalyScore = other.anomalyScore; + this.initialAnomalyScore = other.initialAnomalyScore; + this.maxNormalizedProbability = other.maxNormalizedProbability; + this.recordCount = other.recordCount; + this.records = new ArrayList<>(other.records); + this.eventCount = other.eventCount; + this.isInterim = other.isInterim; + this.hadBigNormalizedUpdate = other.hadBigNormalizedUpdate; + this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers); + this.processingTimeMs = other.processingTimeMs; + this.perPartitionMaxProbability = other.perPartitionMaxProbability; + this.partitionScores = new ArrayList<>(other.partitionScores); + } + @SuppressWarnings("unchecked") public Bucket(StreamInput in) throws IOException { jobId = in.readString(); @@ -155,7 +172,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore); builder.field(MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalizedProbability); builder.field(RECORD_COUNT.getPreferredName(), recordCount); - if (records != null && !records.isEmpty()) { + if (!records.isEmpty()) { builder.field(RECORDS.getPreferredName(), records); } builder.field(EVENT_COUNT.getPreferredName(), eventCount); @@ -168,7 +185,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { return builder; } - public String getJobId() { return jobId; } @@ -208,8 +224,8 @@ public class Bucket extends ToXContentToBytes implements Writeable { return initialAnomalyScore; } - public void setInitialAnomalyScore(double influenceScore) { - this.initialAnomalyScore = influenceScore; + public void setInitialAnomalyScore(double initialAnomalyScore) { + this.initialAnomalyScore = initialAnomalyScore; } public double getMaxNormalizedProbability() { @@ -234,16 +250,14 @@ public class Bucket extends ToXContentToBytes implements Writeable { * only be present when the bucket was retrieved and expanded * to contain the associated records. * - * @return null or the anomaly records for the bucket - * if the bucket was expanded. + * @return the anomaly records for the bucket IF the bucket was expanded. */ - @Nullable public List getRecords() { return records; } public void setRecords(List records) { - this.records = records; + this.records = Objects.requireNonNull(records); } /** @@ -278,13 +292,10 @@ public class Bucket extends ToXContentToBytes implements Writeable { } public void setBucketInfluencers(List bucketInfluencers) { - this.bucketInfluencers = bucketInfluencers; + this.bucketInfluencers = Objects.requireNonNull(bucketInfluencers); } public void addBucketInfluencer(BucketInfluencer bucketInfluencer) { - if (bucketInfluencers == null) { - bucketInfluencers = new ArrayList<>(); - } bucketInfluencers.add(bucketInfluencer); } @@ -293,7 +304,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { } public void setPartitionScores(List scores) { - partitionScores = scores; + partitionScores = Objects.requireNonNull(scores); } public Map getPerPartitionMaxProbability() { @@ -301,7 +312,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { } public void setPerPartitionMaxProbability(Map perPartitionMaxProbability) { - this.perPartitionMaxProbability = perPartitionMaxProbability; + this.perPartitionMaxProbability = Objects.requireNonNull(perPartitionMaxProbability); } public double partitionInitialAnomalyScore(String partitionValue) { @@ -321,7 +332,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { @Override public int hashCode() { // hadBigNormalizedUpdate is deliberately excluded from the hash - // as is id, which is generated by the datastore return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records, isInterim, bucketSpan, bucketInfluencers); } @@ -342,7 +352,6 @@ public class Bucket extends ToXContentToBytes implements Writeable { Bucket that = (Bucket) other; // hadBigNormalizedUpdate is deliberately excluded from the test - // as is id, which is generated by the datastore return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp) && (this.eventCount == that.eventCount) && (this.bucketSpan == that.bucketSpan) && (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore) @@ -380,7 +389,7 @@ public class Bucket extends ToXContentToBytes implements Writeable { * @return true if the bucket should be normalized or false otherwise */ public boolean isNormalizable() { - if (bucketInfluencers == null || bucketInfluencers.isEmpty()) { + if (bucketInfluencers.isEmpty()) { return false; } return anomalyScore > 0.0 || recordCount > 0; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java index 14328b175da..01ef8bb4973 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java @@ -115,6 +115,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); + // Records are not persisted to Elasticsearch as an array within the bucket + // documents, so remove them from the expected bucket before comparing + bucket.setRecords(Collections.emptyList()); assertEquals(bucket, persistedBucket.results().get(0)); QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); @@ -179,6 +182,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); + // Records are not persisted to Elasticsearch as an array within the bucket + // documents, so remove them from the expected bucket before comparing + nonInterimBucket.setRecords(Collections.emptyList()); assertEquals(nonInterimBucket, persistedBucket.results().get(0)); QueryPage persistedInfluencers = jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build()); @@ -225,6 +231,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); assertEquals(1, persistedBucket.count()); + // Records are not persisted to Elasticsearch as an array within the bucket + // documents, so remove them from the expected bucket before comparing + finalBucket.setRecords(Collections.emptyList()); assertEquals(finalBucket, persistedBucket.results().get(0)); QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java index 7f3c7af7252..1a53bdf9cd7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersisterTests.java @@ -76,6 +76,8 @@ public class JobResultsPersisterTests extends ESTestCase { assertTrue(s.matches(".*event_count.:57.*")); assertTrue(s.matches(".*bucket_span.:123456.*")); assertTrue(s.matches(".*processing_time_ms.:8888.*")); + // There should NOT be any nested records + assertFalse(s.matches(".*records*")); s = list.get(1).string(); assertTrue(s.matches(".*probability.:0\\.0054.*")); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java index 0fbf10b94d3..2ca4f8183d8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/normalizer/ScoresUpdaterTests.java @@ -95,7 +95,7 @@ public class ScoresUpdaterTests extends ESTestCase { public void testUpdate_GivenBucketWithNonZeroScoreButNoBucketInfluencers() throws IOException { Bucket bucket = generateBucket(new Date(0)); bucket.setAnomalyScore(0.0); - bucket.setBucketInfluencers(null); + bucket.setBucketInfluencers(new ArrayList<>()); Deque buckets = new ArrayDeque<>(); buckets.add(bucket); givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java index 3f474e10248..98e725f9aa5 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java @@ -165,7 +165,7 @@ public class BucketTests extends AbstractSerializingTestCase { Bucket bucket1 = new Bucket("foo", new Date(123), 123); bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1))); Bucket bucket2 = new Bucket("foo", new Date(123), 123); - bucket2.setRecords(null); + bucket2.setRecords(Collections.emptyList()); assertFalse(bucket1.equals(bucket2)); assertFalse(bucket2.equals(bucket1)); @@ -252,14 +252,6 @@ public class BucketTests extends AbstractSerializingTestCase { assertEquals(bucket1.hashCode(), bucket2.hashCode()); } - public void testIsNormalizable_GivenNullBucketInfluencers() { - Bucket bucket = new Bucket("foo", new Date(123), 123); - bucket.setBucketInfluencers(null); - bucket.setAnomalyScore(90.0); - - assertFalse(bucket.isNormalizable()); - } - public void testIsNormalizable_GivenEmptyBucketInfluencers() { Bucket bucket = new Bucket("foo", new Date(123), 123); bucket.setBucketInfluencers(Collections.emptyList());