AutodetectResultProcessor Integration Test (elastic/elasticsearch#516)

* Add results processor integration test

* Integration tests for AutodetectResultProcessor

Original commit: elastic/x-pack-elasticsearch@19e7ec48dd
This commit is contained in:
David Kyle 2016-12-12 16:55:20 +00:00 committed by GitHub
parent dae0b5625d
commit 2302dc78ba
10 changed files with 562 additions and 25 deletions

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder;
@ -133,10 +134,9 @@ public class JobDataDeleter {
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true); QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
SearchResponse searchResponse = client.prepareSearch(index) SearchResponse searchResponse = client.prepareSearch(index)
.setTypes(Result.RESULT_TYPE.getPreferredName()) .setTypes(Result.TYPE.getPreferredName())
.setQuery(qb) .setQuery(new ConstantScoreQueryBuilder(qb))
.setFetchSource(false) .setFetchSource(false)
.addSort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))
.setScroll(SCROLL_CONTEXT_DURATION) .setScroll(SCROLL_CONTEXT_DURATION)
.setSize(SCROLL_SIZE) .setSize(SCROLL_SIZE)
.get(); .get();

View File

@ -791,7 +791,7 @@ public class JobProvider {
try { try {
response = searchRequestBuilder.get(); response = searchRequestBuilder.get();
} catch (IndexNotFoundException e) { } catch (IndexNotFoundException e) {
throw new ResourceNotFoundException("job " + jobId + " not found"); throw ExceptionsHelper.missingJobException(jobId);
} }
List<Influencer> influencers = new ArrayList<>(); List<Influencer> influencers = new ArrayList<>();
@ -1039,6 +1039,40 @@ public class JobProvider {
return quantiles; return quantiles;
} }
public QueryPage<ModelDebugOutput> modelDebugOutput(String jobId, int from, int size) {
SearchResponse searchResponse;
try {
String indexName = JobResultsPersister.getJobIndexName(jobId);
LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {]",
ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size);
searchResponse = client.prepareSearch(indexName)
.setTypes(Result.TYPE.getPreferredName())
.setQuery(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ModelDebugOutput.RESULT_TYPE_VALUE))
.setFrom(from).setSize(size)
.get();
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
}
List<ModelDebugOutput> results = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse modelDebugOutput", e);
}
ModelDebugOutput modelDebugOutput = ModelDebugOutput.PARSER.apply(parser, () -> parseFieldMatcher);
results.add(modelDebugOutput);
}
return new QueryPage<>(results, searchResponse.getHits().getTotalHits(), ModelDebugOutput.RESULTS_FIELD);
}
/** /**
* Get the job's model size stats. * Get the job's model size stats.
*/ */

View File

@ -85,7 +85,7 @@ public class AutoDetectResultProcessor {
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount); LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
} }
} }
LOGGER.info("[{}] {} buckets parsed from autodetect output - about to refresh indexes", jobId, bucketCount); LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);
LOGGER.info("[{}] Parse results Complete", jobId); LOGGER.info("[{}] Parse results Complete", jobId);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e); LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e);
@ -110,9 +110,7 @@ public class AutoDetectResultProcessor {
// persist after deleting interim results in case the new // persist after deleting interim results in case the new
// results are also interim // results are also interim
context.bulkResultsPersister.persistBucket(bucket).executeRequest(); context.bulkResultsPersister.persistBucket(bucket).executeRequest();
context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId); context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId);
} }
List<AnomalyRecord> records = result.getRecords(); List<AnomalyRecord> records = result.getRecords();
if (records != null && !records.isEmpty()) { if (records != null && !records.isEmpty()) {
@ -208,7 +206,7 @@ public class AutoDetectResultProcessor {
Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) { Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) {
this.jobId = jobId; this.jobId = jobId;
this.isPerPartitionNormalization = isPerPartitionNormalization; this.isPerPartitionNormalization = isPerPartitionNormalization;
this.deleteInterimRequired = true; this.deleteInterimRequired = false;
this.bulkResultsPersister = bulkResultsPersister; this.bulkResultsPersister = bulkResultsPersister;
} }
} }

View File

@ -0,0 +1,403 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.integration;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.InfluencersQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.persistence.RecordsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
import org.elasticsearch.xpack.prelert.job.process.normalizer.noop.NoOpRenormaliser;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecordTests;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketTests;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinitionTests;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.InfluencerTests;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutputTests;
import org.junit.Before;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
private static final String JOB_ID = "foo";
private Renormaliser renormaliser;
private JobResultsPersister jobResultsPersister;
private AutodetectResultsParser autodetectResultsParser;
private JobProvider jobProvider;
@Before
private void createComponents() {
renormaliser = new NoOpRenormaliser();
jobResultsPersister = new JobResultsPersister(nodeSettings(), client());
ParseFieldMatcher matcher = new ParseFieldMatcher(nodeSettings());
autodetectResultsParser = new AutodetectResultsParser(nodeSettings(), () -> matcher);
jobProvider = new JobProvider(client(), 1, matcher);
}
public void testProcessResults() throws IOException {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
Bucket bucket = createBucket(false);
assertNotNull(bucket);
List<AnomalyRecord> records = createRecords(false);
List<Influencer> influencers = createInfluencers(false);
CategoryDefinition categoryDefinition = createCategoryDefinition();
ModelDebugOutput modelDebugOutput = createModelDebugOutput();
ModelSizeStats modelSizeStats = createModelSizeStats();
ModelSnapshot modelSnapshot = createModelSnapshot();
Quantiles quantiles = createQuantiles();
// Add the bucket last as the bucket result triggers persistence
ResultsBuilder resultBuilder = new ResultsBuilder()
.start()
.addRecords(records)
.addInfluencers(influencers)
.addCategoryDefinition(categoryDefinition)
.addModelDebugOutput(modelDebugOutput)
.addModelSizeStats(modelSizeStats)
.addModelSnapshot(modelSnapshot)
.addQuantiles(quantiles)
.addBucket(bucket)
.end();
new Thread(() -> {
try {
writeResults(resultBuilder.build(), outputStream);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
assertEquals(bucket, persistedBucket.results().get(0));
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
assertResultsAreSame(records, persistedRecords);
QueryPage<Influencer> persistedInfluencers =
jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().includeInterim(true).build());
assertResultsAreSame(influencers, persistedInfluencers);
QueryPage<CategoryDefinition> persistedDefinition =
jobProvider.categoryDefinition(JOB_ID, Long.toString(categoryDefinition.getCategoryId()));
assertEquals(1, persistedDefinition.count());
assertEquals(categoryDefinition, persistedDefinition.results().get(0));
QueryPage<ModelDebugOutput> persistedModelDebugOutput = jobProvider.modelDebugOutput(JOB_ID, 0, 100);
assertEquals(1, persistedModelDebugOutput.count());
assertEquals(modelDebugOutput, persistedModelDebugOutput.results().get(0));
Optional<ModelSizeStats> persistedModelSizeStats = jobProvider.modelSizeStats(JOB_ID);
assertTrue(persistedModelSizeStats.isPresent());
assertEquals(modelSizeStats, persistedModelSizeStats.get());
QueryPage<ModelSnapshot> persistedModelSnapshot = jobProvider.modelSnapshots(JOB_ID, 0, 100);
assertEquals(1, persistedModelSnapshot.count());
assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0));
Optional<Quantiles> persistedQuantiles = jobProvider.getQuantiles(JOB_ID);
assertTrue(persistedQuantiles.isPresent());
assertEquals(quantiles, persistedQuantiles.get());
}
public void testDeleteInterimResults() throws IOException, InterruptedException {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
Bucket nonInterimBucket = createBucket(false);
Bucket interimBucket = createBucket(true);
ResultsBuilder resultBuilder = new ResultsBuilder()
.start()
.addRecords(createRecords(true))
.addInfluencers(createInfluencers(true))
.addBucket(interimBucket) // this will persist the interim results
.addFlushAcknowledgement(createFlushAcknowledgement())
.addBucket(nonInterimBucket) // and this will delete the interim results
.end();
new Thread(() -> {
try {
writeResults(resultBuilder.build(), outputStream);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
assertEquals(nonInterimBucket, persistedBucket.results().get(0));
QueryPage<Influencer> persistedInfluencers = jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build());
assertEquals(0, persistedInfluencers.count());
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
assertEquals(0, persistedRecords.count());
}
public void testMultipleFlushesBetweenPersisting() throws IOException, InterruptedException {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
Bucket finalBucket = createBucket(true);
List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
ResultsBuilder resultBuilder = new ResultsBuilder()
.start()
.addRecords(createRecords(true))
.addInfluencers(createInfluencers(true))
.addBucket(createBucket(true)) // this will persist the interim results
.addFlushAcknowledgement(createFlushAcknowledgement())
.addRecords(createRecords(true))
.addBucket(createBucket(true)) // and this will delete the interim results and persist the new interim bucket & records
.addFlushAcknowledgement(createFlushAcknowledgement())
.addRecords(finalAnomalyRecords)
.addBucket(finalBucket) // this deletes the previous interim and persists final bucket & records
.end();
new Thread(() -> {
try {
writeResults(resultBuilder.build(), outputStream);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
assertEquals(finalBucket, persistedBucket.results().get(0));
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID, new RecordsQueryBuilder().includeInterim(true).build());
assertResultsAreSame(finalAnomalyRecords, persistedRecords);
}
private void writeResults(XContentBuilder builder, OutputStream out) throws IOException {
builder.bytes().writeTo(out);
}
private void createJob() {
Detector detector = new Detector.Builder("avg", "metric_field").build();
Job.Builder jobBuilder = new Job.Builder(JOB_ID);
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector)));
jobProvider.createJobRelatedIndices(jobBuilder.build(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
}
@Override
public void onFailure(Exception e) {
}
});
}
private Bucket createBucket(boolean isInterim) {
Bucket bucket = new BucketTests().createTestInstance(JOB_ID);
bucket.setInterim(isInterim);
return bucket;
}
private List<AnomalyRecord> createRecords(boolean isInterim) {
List<AnomalyRecord> records = new ArrayList<>();
int count = randomIntBetween(0, 100);
AnomalyRecordTests anomalyRecordGenerator = new AnomalyRecordTests();
for (int i=0; i<count; i++) {
AnomalyRecord r = anomalyRecordGenerator.createTestInstance(JOB_ID, i);
r.setInterim(isInterim);
records.add(r);
}
return records;
}
private List<Influencer> createInfluencers(boolean isInterim) {
List<Influencer> influencers = new ArrayList<>();
int count = randomIntBetween(0, 100);
InfluencerTests influencerGenerator = new InfluencerTests();
for (int i=0; i<count; i++) {
Influencer influencer = influencerGenerator.createTestInstance(JOB_ID);
influencer.setInterim(isInterim);
influencers.add(influencer);
}
return influencers;
}
private CategoryDefinition createCategoryDefinition() {
return new CategoryDefinitionTests().createTestInstance(JOB_ID);
}
private ModelDebugOutput createModelDebugOutput() {
return new ModelDebugOutputTests().createTestInstance(JOB_ID);
}
private ModelSizeStats createModelSizeStats() {
ModelSizeStats.Builder builder = new ModelSizeStats.Builder(JOB_ID);
builder.setId(randomAsciiOfLength(20));
builder.setTimestamp(new Date(randomPositiveLong()));
builder.setLogTime(new Date(randomPositiveLong()));
builder.setBucketAllocationFailuresCount(randomPositiveLong());
builder.setModelBytes(randomPositiveLong());
builder.setTotalByFieldCount(randomPositiveLong());
builder.setTotalOverFieldCount(randomPositiveLong());
builder.setTotalPartitionFieldCount(randomPositiveLong());
builder.setMemoryStatus(randomFrom(EnumSet.allOf(ModelSizeStats.MemoryStatus.class)));
return builder.build();
}
private ModelSnapshot createModelSnapshot() {
ModelSnapshot snapshot = new ModelSnapshot(JOB_ID);
snapshot.setSnapshotId(randomAsciiOfLength(12));
return snapshot;
}
private Quantiles createQuantiles() {
return new Quantiles(JOB_ID, new Date(randomPositiveLong()), randomAsciiOfLength(100));
}
private FlushAcknowledgement createFlushAcknowledgement() {
return new FlushAcknowledgement(randomAsciiOfLength(5));
}
private class ResultsBuilder {
private XContentBuilder contentBuilder;
private ResultsBuilder() throws IOException {
contentBuilder = XContentFactory.jsonBuilder();
}
ResultsBuilder start() throws IOException {
contentBuilder.startArray();
return this;
}
ResultsBuilder addBucket(Bucket bucket) throws IOException {
contentBuilder.startObject().field(Bucket.RESULT_TYPE_FIELD.getPreferredName(), bucket).endObject();
return this;
}
ResultsBuilder addRecords(List<AnomalyRecord> records) throws IOException {
contentBuilder.startObject().field(AnomalyRecord.RESULTS_FIELD.getPreferredName(), records).endObject();
return this;
}
ResultsBuilder addInfluencers(List<Influencer> influencers) throws IOException {
contentBuilder.startObject().field(Influencer.RESULTS_FIELD.getPreferredName(), influencers).endObject();
return this;
}
ResultsBuilder addCategoryDefinition(CategoryDefinition definition) throws IOException {
contentBuilder.startObject().field(CategoryDefinition.TYPE.getPreferredName(), definition).endObject();
return this;
}
ResultsBuilder addModelDebugOutput(ModelDebugOutput modelDebugOutput) throws IOException {
contentBuilder.startObject().field(ModelDebugOutput.RESULTS_FIELD.getPreferredName(), modelDebugOutput).endObject();
return this;
}
ResultsBuilder addModelSizeStats(ModelSizeStats modelSizeStats) throws IOException {
contentBuilder.startObject().field(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName(), modelSizeStats).endObject();
return this;
}
ResultsBuilder addModelSnapshot(ModelSnapshot modelSnapshot) throws IOException {
contentBuilder.startObject().field(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot).endObject();
return this;
}
ResultsBuilder addQuantiles(Quantiles quantiles) throws IOException {
contentBuilder.startObject().field(Quantiles.TYPE.getPreferredName(), quantiles).endObject();
return this;
}
ResultsBuilder addFlushAcknowledgement(FlushAcknowledgement flushAcknowledgement) throws IOException {
contentBuilder.startObject().field(FlushAcknowledgement.TYPE.getPreferredName(), flushAcknowledgement).endObject();
return this;
}
ResultsBuilder end() throws IOException {
contentBuilder.endArray();
return this;
}
XContentBuilder build() throws IOException {
XContentBuilder result = contentBuilder;
contentBuilder = XContentFactory.jsonBuilder();
return result;
}
}
private <T extends ToXContent & Writeable> void assertResultsAreSame(List<T> expected, QueryPage<T> actual) {
assertEquals(expected.size(), actual.count());
assertEquals(actual.results().size(), actual.count());
Set<T> expectedSet = new HashSet<>(expected);
expectedSet.removeAll(actual.results());
assertEquals(0, expectedSet.size());
}
}

View File

@ -28,8 +28,8 @@ import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -91,6 +91,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = true;
AutodetectResult result = mock(AutodetectResult.class); AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class); Bucket bucket = mock(Bucket.class);
when(result.getBucket()).thenReturn(bucket); when(result.getBucket()).thenReturn(bucket);
@ -121,6 +122,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result); processor.processResult(context, result);
verify(bulkBuilder, times(1)).persistRecords(records); verify(bulkBuilder, times(1)).persistRecords(records);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
} }
@ -144,6 +146,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class)); verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class));
verify(bulkBuilder, times(1)).persistRecords(records); verify(bulkBuilder, times(1)).persistRecords(records);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
} }
@ -164,6 +167,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result); processor.processResult(context, result);
verify(bulkBuilder, times(1)).persistInfluencers(influencers); verify(bulkBuilder, times(1)).persistInfluencers(influencers);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
} }
@ -201,6 +205,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(persister, times(1)).commitWrites(JOB_ID); verify(persister, times(1)).commitWrites(JOB_ID);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired); assertTrue(context.deleteInterimRequired);
} }
@ -227,6 +232,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition); inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
inOrder.verify(persister, times(1)).commitWrites(JOB_ID); inOrder.verify(persister, times(1)).commitWrites(JOB_ID);
inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired); assertTrue(context.deleteInterimRequired);
} }

View File

@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.results;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
public class AnomalyRecordTests extends AbstractSerializingTestCase<AnomalyRecord> {
@Override
protected AnomalyRecord createTestInstance() {
return createTestInstance("foo", 1);
}
public AnomalyRecord createTestInstance(String jobId, int sequenceNum) {
AnomalyRecord anomalyRecord = new AnomalyRecord(jobId, new Date(randomPositiveLong()), randomPositiveLong(), sequenceNum);
anomalyRecord.setActual(Collections.singletonList(randomDouble()));
anomalyRecord.setTypical(Collections.singletonList(randomDouble()));
anomalyRecord.setAnomalyScore(randomDouble());
anomalyRecord.setProbability(randomDouble());
anomalyRecord.setNormalizedProbability(randomDouble());
anomalyRecord.setInitialNormalizedProbability(randomDouble());
anomalyRecord.setInterim(randomBoolean());
if (randomBoolean()) {
anomalyRecord.setFieldName(randomAsciiOfLength(12));
}
if (randomBoolean()) {
anomalyRecord.setByFieldName(randomAsciiOfLength(12));
anomalyRecord.setByFieldValue(randomAsciiOfLength(12));
}
if (randomBoolean()) {
anomalyRecord.setPartitionFieldName(randomAsciiOfLength(12));
anomalyRecord.setPartitionFieldValue(randomAsciiOfLength(12));
}
if (randomBoolean()) {
anomalyRecord.setOverFieldName(randomAsciiOfLength(12));
anomalyRecord.setOverFieldValue(randomAsciiOfLength(12));
}
anomalyRecord.setFunction(randomAsciiOfLengthBetween(5, 20));
anomalyRecord.setFunctionDescription(randomAsciiOfLengthBetween(5, 20));
if (randomBoolean()) {
anomalyRecord.setCorrelatedByFieldValue(randomAsciiOfLength(16));
}
if (randomBoolean()) {
int count = randomIntBetween(0, 9);
List<Influence> influences = new ArrayList<>();
for (int i=0; i<count; i++) {
influences.add(new Influence(randomAsciiOfLength(8), Collections.singletonList(randomAsciiOfLengthBetween(1, 28))));
}
anomalyRecord.setInfluencers(influences);
}
if (randomBoolean()) {
int count = randomIntBetween(0, 9);
List<AnomalyCause> causes = new ArrayList<>();
for (int i=0; i<count; i++) {
causes.add(new AnomalyCauseTests().createTestInstance());
}
anomalyRecord.setCauses(causes);
}
return anomalyRecord;
}
@Override
protected Writeable.Reader<AnomalyRecord> instanceReader() {
return AnomalyRecord::new;
}
@Override
protected AnomalyRecord parseInstance(XContentParser parser, ParseFieldMatcher matcher) {
return AnomalyRecord.PARSER.apply(parser, () -> matcher);
}
}

View File

@ -21,11 +21,12 @@ import java.util.Map;
public class BucketTests extends AbstractSerializingTestCase<Bucket> { public class BucketTests extends AbstractSerializingTestCase<Bucket> {
@Override @Override
protected Bucket createTestInstance() { public Bucket createTestInstance() {
int sequenceNum = 1; return createTestInstance("foo");
String jobId = "foo"; }
Bucket bucket = new Bucket(jobId, new Date(randomLong()), randomPositiveLong());
public Bucket createTestInstance(String jobId) {
Bucket bucket = new Bucket(jobId, new Date(randomPositiveLong()), randomPositiveLong());
if (randomBoolean()) { if (randomBoolean()) {
bucket.setAnomalyScore(randomDouble()); bucket.setAnomalyScore(randomDouble());
} }
@ -80,14 +81,10 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
} }
if (randomBoolean()) { if (randomBoolean()) {
int size = randomInt(10); int size = randomInt(10);
int sequenceNum = 1;
List<AnomalyRecord> records = new ArrayList<>(size); List<AnomalyRecord> records = new ArrayList<>(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
AnomalyRecord anomalyRecord = new AnomalyRecord(jobId, bucket.getTimestamp(), bucket.getBucketSpan(), sequenceNum++); AnomalyRecord anomalyRecord = new AnomalyRecordTests().createTestInstance(jobId, sequenceNum++);
anomalyRecord.setAnomalyScore(randomDouble());
anomalyRecord.setActual(Collections.singletonList(randomDouble()));
anomalyRecord.setTypical(Collections.singletonList(randomDouble()));
anomalyRecord.setProbability(randomDouble());
anomalyRecord.setInterim(randomBoolean());
records.add(anomalyRecord); records.add(anomalyRecord);
} }
bucket.setRecords(records); bucket.setRecords(records);

View File

@ -14,9 +14,8 @@ import java.util.Arrays;
public class CategoryDefinitionTests extends AbstractSerializingTestCase<CategoryDefinition> { public class CategoryDefinitionTests extends AbstractSerializingTestCase<CategoryDefinition> {
@Override public CategoryDefinition createTestInstance(String jobId) {
protected CategoryDefinition createTestInstance() { CategoryDefinition categoryDefinition = new CategoryDefinition(jobId);
CategoryDefinition categoryDefinition = new CategoryDefinition(randomAsciiOfLength(10));
categoryDefinition.setCategoryId(randomLong()); categoryDefinition.setCategoryId(randomLong());
categoryDefinition.setTerms(randomAsciiOfLength(10)); categoryDefinition.setTerms(randomAsciiOfLength(10));
categoryDefinition.setRegex(randomAsciiOfLength(10)); categoryDefinition.setRegex(randomAsciiOfLength(10));
@ -25,6 +24,11 @@ public class CategoryDefinitionTests extends AbstractSerializingTestCase<Categor
return categoryDefinition; return categoryDefinition;
} }
@Override
protected CategoryDefinition createTestInstance() {
return createTestInstance(randomAsciiOfLength(10));
}
@Override @Override
protected Writeable.Reader<CategoryDefinition> instanceReader() { protected Writeable.Reader<CategoryDefinition> instanceReader() {
return CategoryDefinition::new; return CategoryDefinition::new;

View File

@ -14,10 +14,18 @@ import java.util.Date;
public class InfluencerTests extends AbstractSerializingTestCase<Influencer> { public class InfluencerTests extends AbstractSerializingTestCase<Influencer> {
public Influencer createTestInstance(String jobId) {
Influencer influencer = new Influencer(jobId, randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20),
new Date(randomPositiveLong()), randomPositiveLong(), randomIntBetween(1, 1000));
influencer.setInterim(randomBoolean());
influencer.setAnomalyScore(randomDouble());
influencer.setInitialAnomalyScore(randomDouble());
influencer.setProbability(randomDouble());
return influencer;
}
@Override @Override
protected Influencer createTestInstance() { protected Influencer createTestInstance() {
return new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), return createTestInstance(randomAsciiOfLengthBetween(1, 20));
new Date(), randomPositiveLong(), randomIntBetween(1, 1000));
} }
@Override @Override

View File

@ -15,7 +15,11 @@ public class ModelDebugOutputTests extends AbstractSerializingTestCase<ModelDebu
@Override @Override
protected ModelDebugOutput createTestInstance() { protected ModelDebugOutput createTestInstance() {
ModelDebugOutput modelDebugOutput = new ModelDebugOutput("foo"); return createTestInstance("foo");
}
public ModelDebugOutput createTestInstance(String jobId) {
ModelDebugOutput modelDebugOutput = new ModelDebugOutput(jobId);
if (randomBoolean()) { if (randomBoolean()) {
modelDebugOutput.setByFieldName(randomAsciiOfLengthBetween(1, 20)); modelDebugOutput.setByFieldName(randomAsciiOfLengthBetween(1, 20));
} }