From 2302dc78ba7cf2ad257b98dfb51ac72a707070de Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 12 Dec 2016 16:55:20 +0000 Subject: [PATCH] AutodetectResultProcessor Integration Test (elastic/elasticsearch#516) * Add results processor integration test * Integration tests for AutodetectResultProcessor Original commit: elastic/x-pack-elasticsearch@19e7ec48dd96bc41ece03e6130caaec685b50ca3 --- .../job/persistence/JobDataDeleter.java | 6 +- .../prelert/job/persistence/JobProvider.java | 36 +- .../output/AutoDetectResultProcessor.java | 6 +- .../AutodetectResultProcessorIT.java | 403 ++++++++++++++++++ .../AutoDetectResultProcessorTests.java | 8 +- .../job/results/AnomalyRecordTests.java | 83 ++++ .../prelert/job/results/BucketTests.java | 17 +- .../job/results/CategoryDefinitionTests.java | 10 +- .../prelert/job/results/InfluencerTests.java | 12 +- .../job/results/ModelDebugOutputTests.java | 6 +- 10 files changed, 562 insertions(+), 25 deletions(-) create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java create mode 100644 elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecordTests.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java index ee3c1474c14..67e5436b938 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobDataDeleter.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; @@ -133,10 +134,9 @@ public class JobDataDeleter { QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); SearchResponse searchResponse = client.prepareSearch(index) - .setTypes(Result.RESULT_TYPE.getPreferredName()) - .setQuery(qb) + .setTypes(Result.TYPE.getPreferredName()) + .setQuery(new ConstantScoreQueryBuilder(qb)) .setFetchSource(false) - .addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)) .setScroll(SCROLL_CONTEXT_DURATION) .setSize(SCROLL_SIZE) .get(); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java index f61c1ab7ddb..5372da03f44 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobProvider.java @@ -791,7 +791,7 @@ public class JobProvider { try { response = searchRequestBuilder.get(); } catch (IndexNotFoundException e) { - throw new ResourceNotFoundException("job " + jobId + " not found"); + throw ExceptionsHelper.missingJobException(jobId); } List influencers = new ArrayList<>(); @@ -1039,6 +1039,40 @@ public class JobProvider { return quantiles; } + public QueryPage modelDebugOutput(String jobId, int from, int size) { + + SearchResponse searchResponse; + try { + String indexName = JobResultsPersister.getJobIndexName(jobId); + LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {]", + ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size); + + searchResponse = client.prepareSearch(indexName) + .setTypes(Result.TYPE.getPreferredName()) + .setQuery(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ModelDebugOutput.RESULT_TYPE_VALUE)) + .setFrom(from).setSize(size) + .get(); + } catch (IndexNotFoundException e) { + throw ExceptionsHelper.missingJobException(jobId); + } + + List results = new ArrayList<>(); + + for (SearchHit hit : searchResponse.getHits().getHits()) { + BytesReference source = hit.getSourceRef(); + XContentParser parser; + try { + parser = XContentFactory.xContent(source).createParser(source); + } catch (IOException e) { + throw new ElasticsearchParseException("failed to parse modelDebugOutput", e); + } + ModelDebugOutput modelDebugOutput = ModelDebugOutput.PARSER.apply(parser, () -> parseFieldMatcher); + results.add(modelDebugOutput); + } + + return new QueryPage<>(results, searchResponse.getHits().getTotalHits(), ModelDebugOutput.RESULTS_FIELD); + } + /** * Get the job's model size stats. */ diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java index 18597bc183c..87262dfeee0 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -85,7 +85,7 @@ public class AutoDetectResultProcessor { LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); } } - LOGGER.info("[{}] {} buckets parsed from autodetect output - about to refresh indexes", jobId, bucketCount); + LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); LOGGER.info("[{}] Parse results Complete", jobId); } catch (Exception e) { LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e); @@ -110,9 +110,7 @@ public class AutoDetectResultProcessor { // persist after deleting interim results in case the new // results are also interim context.bulkResultsPersister.persistBucket(bucket).executeRequest(); - context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId); - } List records = result.getRecords(); if (records != null && !records.isEmpty()) { @@ -208,7 +206,7 @@ public class AutoDetectResultProcessor { Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) { this.jobId = jobId; this.isPerPartitionNormalization = isPerPartitionNormalization; - this.deleteInterimRequired = true; + this.deleteInterimRequired = false; this.bulkResultsPersister = bulkResultsPersister; } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java new file mode 100644 index 00000000000..6fd52384d38 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java @@ -0,0 +1,403 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.prelert.integration; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.prelert.job.AnalysisConfig; +import org.elasticsearch.xpack.prelert.job.Detector; +import org.elasticsearch.xpack.prelert.job.Job; +import org.elasticsearch.xpack.prelert.job.ModelSizeStats; +import org.elasticsearch.xpack.prelert.job.ModelSnapshot; +import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder; +import org.elasticsearch.xpack.prelert.job.persistence.InfluencersQueryBuilder; +import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; +import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; +import org.elasticsearch.xpack.prelert.job.persistence.RecordsQueryBuilder; +import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutoDetectResultProcessor; +import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectResultsParser; +import org.elasticsearch.xpack.prelert.job.process.autodetect.output.FlushAcknowledgement; +import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser; +import org.elasticsearch.xpack.prelert.job.process.normalizer.noop.NoOpRenormaliser; +import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles; +import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord; +import org.elasticsearch.xpack.prelert.job.results.AnomalyRecordTests; +import org.elasticsearch.xpack.prelert.job.results.Bucket; +import org.elasticsearch.xpack.prelert.job.results.BucketTests; +import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition; +import org.elasticsearch.xpack.prelert.job.results.CategoryDefinitionTests; +import org.elasticsearch.xpack.prelert.job.results.Influencer; +import org.elasticsearch.xpack.prelert.job.results.InfluencerTests; +import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput; +import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutputTests; +import org.junit.Before; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { + private static final String JOB_ID = "foo"; + + private Renormaliser renormaliser; + private JobResultsPersister jobResultsPersister; + private AutodetectResultsParser autodetectResultsParser; + private JobProvider jobProvider; + + @Before + private void createComponents() { + renormaliser = new NoOpRenormaliser(); + jobResultsPersister = new JobResultsPersister(nodeSettings(), client()); + ParseFieldMatcher matcher = new ParseFieldMatcher(nodeSettings()); + autodetectResultsParser = new AutodetectResultsParser(nodeSettings(), () -> matcher); + jobProvider = new JobProvider(client(), 1, matcher); + } + + public void testProcessResults() throws IOException { + createJob(); + + AutoDetectResultProcessor resultProcessor = + new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser); + + PipedOutputStream outputStream = new PipedOutputStream(); + PipedInputStream inputStream = new PipedInputStream(outputStream); + + Bucket bucket = createBucket(false); + assertNotNull(bucket); + List records = createRecords(false); + List influencers = createInfluencers(false); + CategoryDefinition categoryDefinition = createCategoryDefinition(); + ModelDebugOutput modelDebugOutput = createModelDebugOutput(); + ModelSizeStats modelSizeStats = createModelSizeStats(); + ModelSnapshot modelSnapshot = createModelSnapshot(); + Quantiles quantiles = createQuantiles(); + + // Add the bucket last as the bucket result triggers persistence + ResultsBuilder resultBuilder = new ResultsBuilder() + .start() + .addRecords(records) + .addInfluencers(influencers) + .addCategoryDefinition(categoryDefinition) + .addModelDebugOutput(modelDebugOutput) + .addModelSizeStats(modelSizeStats) + .addModelSnapshot(modelSnapshot) + .addQuantiles(quantiles) + .addBucket(bucket) + .end(); + + new Thread(() -> { + try { + writeResults(resultBuilder.build(), outputStream); + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + + resultProcessor.process(JOB_ID, inputStream, false); + jobResultsPersister.commitWrites(JOB_ID); + + QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); + assertEquals(1, persistedBucket.count()); + assertEquals(bucket, persistedBucket.results().get(0)); + + QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); + assertResultsAreSame(records, persistedRecords); + + QueryPage persistedInfluencers = + jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().includeInterim(true).build()); + assertResultsAreSame(influencers, persistedInfluencers); + + QueryPage persistedDefinition = + jobProvider.categoryDefinition(JOB_ID, Long.toString(categoryDefinition.getCategoryId())); + assertEquals(1, persistedDefinition.count()); + assertEquals(categoryDefinition, persistedDefinition.results().get(0)); + + QueryPage persistedModelDebugOutput = jobProvider.modelDebugOutput(JOB_ID, 0, 100); + assertEquals(1, persistedModelDebugOutput.count()); + assertEquals(modelDebugOutput, persistedModelDebugOutput.results().get(0)); + + Optional persistedModelSizeStats = jobProvider.modelSizeStats(JOB_ID); + assertTrue(persistedModelSizeStats.isPresent()); + assertEquals(modelSizeStats, persistedModelSizeStats.get()); + + QueryPage persistedModelSnapshot = jobProvider.modelSnapshots(JOB_ID, 0, 100); + assertEquals(1, persistedModelSnapshot.count()); + assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0)); + + Optional persistedQuantiles = jobProvider.getQuantiles(JOB_ID); + assertTrue(persistedQuantiles.isPresent()); + assertEquals(quantiles, persistedQuantiles.get()); + } + + public void testDeleteInterimResults() throws IOException, InterruptedException { + createJob(); + + AutoDetectResultProcessor resultProcessor = + new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser); + + PipedOutputStream outputStream = new PipedOutputStream(); + PipedInputStream inputStream = new PipedInputStream(outputStream); + + Bucket nonInterimBucket = createBucket(false); + Bucket interimBucket = createBucket(true); + + ResultsBuilder resultBuilder = new ResultsBuilder() + .start() + .addRecords(createRecords(true)) + .addInfluencers(createInfluencers(true)) + .addBucket(interimBucket) // this will persist the interim results + .addFlushAcknowledgement(createFlushAcknowledgement()) + .addBucket(nonInterimBucket) // and this will delete the interim results + .end(); + + new Thread(() -> { + try { + writeResults(resultBuilder.build(), outputStream); + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + + resultProcessor.process(JOB_ID, inputStream, false); + jobResultsPersister.commitWrites(JOB_ID); + + QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); + assertEquals(1, persistedBucket.count()); + assertEquals(nonInterimBucket, persistedBucket.results().get(0)); + + QueryPage persistedInfluencers = jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build()); + assertEquals(0, persistedInfluencers.count()); + + QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); + assertEquals(0, persistedRecords.count()); + } + + public void testMultipleFlushesBetweenPersisting() throws IOException, InterruptedException { + createJob(); + + AutoDetectResultProcessor resultProcessor = + new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser); + + PipedOutputStream outputStream = new PipedOutputStream(); + PipedInputStream inputStream = new PipedInputStream(outputStream); + + Bucket finalBucket = createBucket(true); + List finalAnomalyRecords = createRecords(true); + + ResultsBuilder resultBuilder = new ResultsBuilder() + .start() + .addRecords(createRecords(true)) + .addInfluencers(createInfluencers(true)) + .addBucket(createBucket(true)) // this will persist the interim results + .addFlushAcknowledgement(createFlushAcknowledgement()) + .addRecords(createRecords(true)) + .addBucket(createBucket(true)) // and this will delete the interim results and persist the new interim bucket & records + .addFlushAcknowledgement(createFlushAcknowledgement()) + .addRecords(finalAnomalyRecords) + .addBucket(finalBucket) // this deletes the previous interim and persists final bucket & records + .end(); + + new Thread(() -> { + try { + writeResults(resultBuilder.build(), outputStream); + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + + resultProcessor.process(JOB_ID, inputStream, false); + jobResultsPersister.commitWrites(JOB_ID); + + QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); + assertEquals(1, persistedBucket.count()); + assertEquals(finalBucket, persistedBucket.results().get(0)); + + QueryPage persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build()); + assertResultsAreSame(finalAnomalyRecords, persistedRecords); + } + + private void writeResults(XContentBuilder builder, OutputStream out) throws IOException { + builder.bytes().writeTo(out); + } + + private void createJob() { + Detector detector = new Detector.Builder("avg", "metric_field").build(); + Job.Builder jobBuilder = new Job.Builder(JOB_ID); + jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector))); + + jobProvider.createJobRelatedIndices(jobBuilder.build(), new ActionListener() { + @Override + public void onResponse(Boolean aBoolean) { + } + + @Override + public void onFailure(Exception e) { + } + }); + } + + private Bucket createBucket(boolean isInterim) { + Bucket bucket = new BucketTests().createTestInstance(JOB_ID); + bucket.setInterim(isInterim); + return bucket; + } + + private List createRecords(boolean isInterim) { + List records = new ArrayList<>(); + + int count = randomIntBetween(0, 100); + AnomalyRecordTests anomalyRecordGenerator = new AnomalyRecordTests(); + for (int i=0; i createInfluencers(boolean isInterim) { + List influencers = new ArrayList<>(); + + int count = randomIntBetween(0, 100); + InfluencerTests influencerGenerator = new InfluencerTests(); + for (int i=0; i records) throws IOException { + contentBuilder.startObject().field(AnomalyRecord.RESULTS_FIELD.getPreferredName(), records).endObject(); + return this; + } + + ResultsBuilder addInfluencers(List influencers) throws IOException { + contentBuilder.startObject().field(Influencer.RESULTS_FIELD.getPreferredName(), influencers).endObject(); + return this; + } + + ResultsBuilder addCategoryDefinition(CategoryDefinition definition) throws IOException { + contentBuilder.startObject().field(CategoryDefinition.TYPE.getPreferredName(), definition).endObject(); + return this; + } + + ResultsBuilder addModelDebugOutput(ModelDebugOutput modelDebugOutput) throws IOException { + contentBuilder.startObject().field(ModelDebugOutput.RESULTS_FIELD.getPreferredName(), modelDebugOutput).endObject(); + return this; + } + + ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) throws IOException { + contentBuilder.startObject().field(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(), modelSizeStats).endObject(); + return this; + } + + ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) throws IOException { + contentBuilder.startObject().field(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot).endObject(); + return this; + } + + ResultsBuilder addQuantiles(Quantiles quantiles) throws IOException { + contentBuilder.startObject().field(Quantiles.TYPE.getPreferredName(), quantiles).endObject(); + return this; + } + + ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) throws IOException { + contentBuilder.startObject().field(FlushAcknowledgement.TYPE.getPreferredName(), flushAcknowledgement).endObject(); + return this; + } + + + ResultsBuilder end() throws IOException { + contentBuilder.endArray(); + return this; + } + + XContentBuilder build() throws IOException { + XContentBuilder result = contentBuilder; + contentBuilder = XContentFactory.jsonBuilder(); + return result; + } + } + + + private void assertResultsAreSame(List expected, QueryPage actual) { + assertEquals(expected.size(), actual.count()); + assertEquals(actual.results().size(), actual.count()); + Set expectedSet = new HashSet<>(expected); + expectedSet.removeAll(actual.results()); + assertEquals(0, expectedSet.size()); + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 7c8832b909c..2c7fadb9842 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -28,8 +28,8 @@ import java.util.List; import java.util.stream.Stream; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -91,6 +91,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); + context.deleteInterimRequired = true; AutodetectResult result = mock(AutodetectResult.class); Bucket bucket = mock(Bucket.class); when(result.getBucket()).thenReturn(bucket); @@ -121,6 +122,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { processor.processResult(context, result); verify(bulkBuilder, times(1)).persistRecords(records); + verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); } @@ -144,6 +146,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class)); verify(bulkBuilder, times(1)).persistRecords(records); + verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); } @@ -164,6 +167,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { processor.processResult(context, result); verify(bulkBuilder, times(1)).persistInfluencers(influencers); + verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); } @@ -201,6 +205,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); verify(persister, times(1)).commitWrites(JOB_ID); + verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); assertTrue(context.deleteInterimRequired); } @@ -227,6 +232,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition); inOrder.verify(persister, times(1)).commitWrites(JOB_ID); inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); + verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); assertTrue(context.deleteInterimRequired); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecordTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecordTests.java new file mode 100644 index 00000000000..f5abc2fa937 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/AnomalyRecordTests.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.prelert.job.results; + +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +public class AnomalyRecordTests extends AbstractSerializingTestCase { + + @Override + protected AnomalyRecord createTestInstance() { + return createTestInstance("foo", 1); + } + + public AnomalyRecord createTestInstance(String jobId, int sequenceNum) { + AnomalyRecord anomalyRecord = new AnomalyRecord(jobId, new Date(randomPositiveLong()), randomPositiveLong(), sequenceNum); + anomalyRecord.setActual(Collections.singletonList(randomDouble())); + anomalyRecord.setTypical(Collections.singletonList(randomDouble())); + anomalyRecord.setAnomalyScore(randomDouble()); + anomalyRecord.setProbability(randomDouble()); + anomalyRecord.setNormalizedProbability(randomDouble()); + anomalyRecord.setInitialNormalizedProbability(randomDouble()); + anomalyRecord.setInterim(randomBoolean()); + if (randomBoolean()) { + anomalyRecord.setFieldName(randomAsciiOfLength(12)); + } + if (randomBoolean()) { + anomalyRecord.setByFieldName(randomAsciiOfLength(12)); + anomalyRecord.setByFieldValue(randomAsciiOfLength(12)); + } + if (randomBoolean()) { + anomalyRecord.setPartitionFieldName(randomAsciiOfLength(12)); + anomalyRecord.setPartitionFieldValue(randomAsciiOfLength(12)); + } + if (randomBoolean()) { + anomalyRecord.setOverFieldName(randomAsciiOfLength(12)); + anomalyRecord.setOverFieldValue(randomAsciiOfLength(12)); + } + anomalyRecord.setFunction(randomAsciiOfLengthBetween(5, 20)); + anomalyRecord.setFunctionDescription(randomAsciiOfLengthBetween(5, 20)); + if (randomBoolean()) { + anomalyRecord.setCorrelatedByFieldValue(randomAsciiOfLength(16)); + } + if (randomBoolean()) { + int count = randomIntBetween(0, 9); + List influences = new ArrayList<>(); + for (int i=0; i causes = new ArrayList<>(); + for (int i=0; i instanceReader() { + return AnomalyRecord::new; + } + + @Override + protected AnomalyRecord parseInstance(XContentParser parser, ParseFieldMatcher matcher) { + return AnomalyRecord.PARSER.apply(parser, () -> matcher); + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java index 0b6f5a941fd..821979c543e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/BucketTests.java @@ -21,11 +21,12 @@ import java.util.Map; public class BucketTests extends AbstractSerializingTestCase { @Override - protected Bucket createTestInstance() { - int sequenceNum = 1; - String jobId = "foo"; - Bucket bucket = new Bucket(jobId, new Date(randomLong()), randomPositiveLong()); + public Bucket createTestInstance() { + return createTestInstance("foo"); + } + public Bucket createTestInstance(String jobId) { + Bucket bucket = new Bucket(jobId, new Date(randomPositiveLong()), randomPositiveLong()); if (randomBoolean()) { bucket.setAnomalyScore(randomDouble()); } @@ -80,14 +81,10 @@ public class BucketTests extends AbstractSerializingTestCase { } if (randomBoolean()) { int size = randomInt(10); + int sequenceNum = 1; List records = new ArrayList<>(size); for (int i = 0; i < size; i++) { - AnomalyRecord anomalyRecord = new AnomalyRecord(jobId, bucket.getTimestamp(), bucket.getBucketSpan(), sequenceNum++); - anomalyRecord.setAnomalyScore(randomDouble()); - anomalyRecord.setActual(Collections.singletonList(randomDouble())); - anomalyRecord.setTypical(Collections.singletonList(randomDouble())); - anomalyRecord.setProbability(randomDouble()); - anomalyRecord.setInterim(randomBoolean()); + AnomalyRecord anomalyRecord = new AnomalyRecordTests().createTestInstance(jobId, sequenceNum++); records.add(anomalyRecord); } bucket.setRecords(records); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/CategoryDefinitionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/CategoryDefinitionTests.java index a86cbeb269e..17addb06be4 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/CategoryDefinitionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/CategoryDefinitionTests.java @@ -14,9 +14,8 @@ import java.util.Arrays; public class CategoryDefinitionTests extends AbstractSerializingTestCase { - @Override - protected CategoryDefinition createTestInstance() { - CategoryDefinition categoryDefinition = new CategoryDefinition(randomAsciiOfLength(10)); + public CategoryDefinition createTestInstance(String jobId) { + CategoryDefinition categoryDefinition = new CategoryDefinition(jobId); categoryDefinition.setCategoryId(randomLong()); categoryDefinition.setTerms(randomAsciiOfLength(10)); categoryDefinition.setRegex(randomAsciiOfLength(10)); @@ -25,6 +24,11 @@ public class CategoryDefinitionTests extends AbstractSerializingTestCase instanceReader() { return CategoryDefinition::new; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/InfluencerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/InfluencerTests.java index 47fd87012c2..864b6769221 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/InfluencerTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/InfluencerTests.java @@ -14,10 +14,18 @@ import java.util.Date; public class InfluencerTests extends AbstractSerializingTestCase { + public Influencer createTestInstance(String jobId) { + Influencer influencer = new Influencer(jobId, randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), + new Date(randomPositiveLong()), randomPositiveLong(), randomIntBetween(1, 1000)); + influencer.setInterim(randomBoolean()); + influencer.setAnomalyScore(randomDouble()); + influencer.setInitialAnomalyScore(randomDouble()); + influencer.setProbability(randomDouble()); + return influencer; + } @Override protected Influencer createTestInstance() { - return new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), - new Date(), randomPositiveLong(), randomIntBetween(1, 1000)); + return createTestInstance(randomAsciiOfLengthBetween(1, 20)); } @Override diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/ModelDebugOutputTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/ModelDebugOutputTests.java index 85e23bd46ed..d5debe74102 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/ModelDebugOutputTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/results/ModelDebugOutputTests.java @@ -15,7 +15,11 @@ public class ModelDebugOutputTests extends AbstractSerializingTestCase