Never persist records inside bucket objects (elastic/elasticsearch#588)

This doesn't happen initially when buckets are output by the C++, but
buckets can contain records at the moment they're sent for persistence
during normalization or during integration tests.  It's safest if the
persistence code specifically doesn't persist these records.

Original commit: elastic/x-pack-elasticsearch@a93135d8c0
This commit is contained in:
David Roberts 2016-12-21 10:42:39 +00:00 committed by GitHub
parent 0b5b26284b
commit 4033c93a13
6 changed files with 62 additions and 42 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.prelert.job.results.Result;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -68,7 +69,7 @@ public class JobResultsPersister extends AbstractComponent {
private final String jobId;
private final String indexName;
private Builder (String jobId) {
private Builder(String jobId) {
this.jobId = Objects.requireNonNull(jobId);
indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
bulkRequest = client.prepareBulk();
@ -82,22 +83,30 @@ public class JobResultsPersister extends AbstractComponent {
* @return this
*/
public Builder persistBucket(Bucket bucket) {
// If the supplied bucket has records then create a copy with records
// removed, because we never persist nested records in buckets
Bucket bucketWithoutRecords = bucket;
if (!bucketWithoutRecords.getRecords().isEmpty()) {
bucketWithoutRecords = new Bucket(bucket);
bucketWithoutRecords.setRecords(Collections.emptyList());
}
try {
XContentBuilder content = toXContentBuilder(bucket);
XContentBuilder content = toXContentBuilder(bucketWithoutRecords);
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}",
jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucket.getEpoch());
jobId, Bucket.RESULT_TYPE_VALUE, indexName, bucketWithoutRecords.getEpoch());
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(), bucket.getId()).setSource(content));
bulkRequest.add(client.prepareIndex(indexName, Result.TYPE.getPreferredName(),
bucketWithoutRecords.getId()).setSource(content));
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers());
persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}, e));
logger.error(new ParameterizedMessage("[{}] Error serialising bucket", new Object[] {jobId}), e);
}
return this;
}
private void persistBucketInfluencersStandalone(String jobId, String bucketId, List<BucketInfluencer> bucketInfluencers)
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers)
throws IOException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
@ -128,7 +137,7 @@ public class JobResultsPersister extends AbstractComponent {
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), record.getId()).setSource(content));
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}, e));
logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e);
}
return this;
@ -151,7 +160,7 @@ public class JobResultsPersister extends AbstractComponent {
client.prepareIndex(indexName, Result.TYPE.getPreferredName(), influencer.getId()).setSource(content));
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}, e));
logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e);
}
return this;
@ -173,7 +182,7 @@ public class JobResultsPersister extends AbstractComponent {
.setSource(builder));
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores",
new Object[]{jobId}, e));
new Object[]{jobId}), e);
}
return this;
@ -294,7 +303,6 @@ public class JobResultsPersister extends AbstractComponent {
return true;
}
XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
XContentBuilder builder = jsonBuilder();
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);
@ -336,7 +344,7 @@ public class JobResultsPersister extends AbstractComponent {
.execute().actionGet();
return true;
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}, e));
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}), e);
return false;
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.job.results;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.StreamInput;
@ -36,7 +35,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
/*
* Field Names
*/
public static final ParseField JOB_ID = Job.ID;
private static final ParseField JOB_ID = Job.ID;
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField ANOMALY_SCORE = new ParseField("anomaly_score");
@ -98,7 +97,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
private long eventCount;
private boolean isInterim;
private boolean hadBigNormalizedUpdate;
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>();
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to
private long processingTimeMs;
private Map<String, Double> perPartitionMaxProbability = Collections.emptyMap();
private List<PartitionScore> partitionScores = Collections.emptyList();
@ -109,6 +108,24 @@ public class Bucket extends ToXContentToBytes implements Writeable {
this.bucketSpan = bucketSpan;
}
public Bucket(Bucket other) {
this.jobId = other.jobId;
this.timestamp = other.timestamp;
this.bucketSpan = other.bucketSpan;
this.anomalyScore = other.anomalyScore;
this.initialAnomalyScore = other.initialAnomalyScore;
this.maxNormalizedProbability = other.maxNormalizedProbability;
this.recordCount = other.recordCount;
this.records = new ArrayList<>(other.records);
this.eventCount = other.eventCount;
this.isInterim = other.isInterim;
this.hadBigNormalizedUpdate = other.hadBigNormalizedUpdate;
this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers);
this.processingTimeMs = other.processingTimeMs;
this.perPartitionMaxProbability = other.perPartitionMaxProbability;
this.partitionScores = new ArrayList<>(other.partitionScores);
}
@SuppressWarnings("unchecked")
public Bucket(StreamInput in) throws IOException {
jobId = in.readString();
@ -155,7 +172,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore);
builder.field(MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalizedProbability);
builder.field(RECORD_COUNT.getPreferredName(), recordCount);
if (records != null && !records.isEmpty()) {
if (!records.isEmpty()) {
builder.field(RECORDS.getPreferredName(), records);
}
builder.field(EVENT_COUNT.getPreferredName(), eventCount);
@ -168,7 +185,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
return builder;
}
public String getJobId() {
return jobId;
}
@ -208,8 +224,8 @@ public class Bucket extends ToXContentToBytes implements Writeable {
return initialAnomalyScore;
}
public void setInitialAnomalyScore(double influenceScore) {
this.initialAnomalyScore = influenceScore;
public void setInitialAnomalyScore(double initialAnomalyScore) {
this.initialAnomalyScore = initialAnomalyScore;
}
public double getMaxNormalizedProbability() {
@ -234,16 +250,14 @@ public class Bucket extends ToXContentToBytes implements Writeable {
* only be present when the bucket was retrieved and expanded
* to contain the associated records.
*
* @return <code>null</code> or the anomaly records for the bucket
* if the bucket was expanded.
* @return the anomaly records for the bucket IF the bucket was expanded.
*/
@Nullable
public List<AnomalyRecord> getRecords() {
return records;
}
public void setRecords(List<AnomalyRecord> records) {
this.records = records;
this.records = Objects.requireNonNull(records);
}
/**
@ -278,13 +292,10 @@ public class Bucket extends ToXContentToBytes implements Writeable {
}
public void setBucketInfluencers(List<BucketInfluencer> bucketInfluencers) {
this.bucketInfluencers = bucketInfluencers;
this.bucketInfluencers = Objects.requireNonNull(bucketInfluencers);
}
public void addBucketInfluencer(BucketInfluencer bucketInfluencer) {
if (bucketInfluencers == null) {
bucketInfluencers = new ArrayList<>();
}
bucketInfluencers.add(bucketInfluencer);
}
@ -293,7 +304,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
}
public void setPartitionScores(List<PartitionScore> scores) {
partitionScores = scores;
partitionScores = Objects.requireNonNull(scores);
}
public Map<String, Double> getPerPartitionMaxProbability() {
@ -301,7 +312,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
}
public void setPerPartitionMaxProbability(Map<String, Double> perPartitionMaxProbability) {
this.perPartitionMaxProbability = perPartitionMaxProbability;
this.perPartitionMaxProbability = Objects.requireNonNull(perPartitionMaxProbability);
}
public double partitionInitialAnomalyScore(String partitionValue) {
@ -321,7 +332,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
// hadBigNormalizedUpdate is deliberately excluded from the hash
// as is id, which is generated by the datastore
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records,
isInterim, bucketSpan, bucketInfluencers);
}
@ -342,7 +352,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
Bucket that = (Bucket) other;
// hadBigNormalizedUpdate is deliberately excluded from the test
// as is id, which is generated by the datastore
return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp)
&& (this.eventCount == that.eventCount) && (this.bucketSpan == that.bucketSpan)
&& (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore)
@ -380,7 +389,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
* @return true if the bucket should be normalized or false otherwise
*/
public boolean isNormalizable() {
if (bucketInfluencers == null || bucketInfluencers.isEmpty()) {
if (bucketInfluencers.isEmpty()) {
return false;
}
return anomalyScore > 0.0 || recordCount > 0;

View File

@ -115,6 +115,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
// Records are not persisted to Elasticsearch as an array within the bucket
// documents, so remove them from the expected bucket before comparing
bucket.setRecords(Collections.emptyList());
assertEquals(bucket, persistedBucket.results().get(0));
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
@ -179,6 +182,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
// Records are not persisted to Elasticsearch as an array within the bucket
// documents, so remove them from the expected bucket before comparing
nonInterimBucket.setRecords(Collections.emptyList());
assertEquals(nonInterimBucket, persistedBucket.results().get(0));
QueryPage<Influencer> persistedInfluencers = jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build());
@ -225,6 +231,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
// Records are not persisted to Elasticsearch as an array within the bucket
// documents, so remove them from the expected bucket before comparing
finalBucket.setRecords(Collections.emptyList());
assertEquals(finalBucket, persistedBucket.results().get(0));
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());

View File

@ -76,6 +76,8 @@ public class JobResultsPersisterTests extends ESTestCase {
assertTrue(s.matches(".*event_count.:57.*"));
assertTrue(s.matches(".*bucket_span.:123456.*"));
assertTrue(s.matches(".*processing_time_ms.:8888.*"));
// There should NOT be any nested records
assertFalse(s.matches(".*records*"));
s = list.get(1).string();
assertTrue(s.matches(".*probability.:0\\.0054.*"));

View File

@ -95,7 +95,7 @@ public class ScoresUpdaterTests extends ESTestCase {
public void testUpdate_GivenBucketWithNonZeroScoreButNoBucketInfluencers() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(0.0);
bucket.setBucketInfluencers(null);
bucket.setBucketInfluencers(new ArrayList<>());
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);

View File

@ -165,7 +165,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
Bucket bucket1 = new Bucket("foo", new Date(123), 123);
bucket1.setRecords(Arrays.asList(new AnomalyRecord("foo", new Date(123), 123, 1)));
Bucket bucket2 = new Bucket("foo", new Date(123), 123);
bucket2.setRecords(null);
bucket2.setRecords(Collections.emptyList());
assertFalse(bucket1.equals(bucket2));
assertFalse(bucket2.equals(bucket1));
@ -252,14 +252,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
assertEquals(bucket1.hashCode(), bucket2.hashCode());
}
public void testIsNormalizable_GivenNullBucketInfluencers() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setBucketInfluencers(null);
bucket.setAnomalyScore(90.0);
assertFalse(bucket.isNormalizable());
}
public void testIsNormalizable_GivenEmptyBucketInfluencers() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setBucketInfluencers(Collections.emptyList());