[ML] Normalize records and buckets separately (elastic/x-pack-elasticsearch#1524)

Previously we used to normalize records with their buckets.  This required
nested scrolls: an outer scroll over buckets, then a nested scroll for
records in each bucket.  This was fragile.

The new approach is to simply scroll first through buckets, then through
records.  This is made possible because we no longer store max_record_score
on buckets nor bucket anomaly_score on records.

While making these changes I noticed that the PerPartitionMaxProbabilities
class was redundant (because it was storing max_record_score in the case of
per-partition normalization), so I removed it.  I also removed a redundant
Map from the Bucket class and fixed its equals() and hashCode() methods.

relates elastic/x-pack-elasticsearch#1115

Original commit: elastic/x-pack-elasticsearch@efbee63573
This commit is contained in:
David Roberts 2017-05-25 08:50:56 +01:00 committed by GitHub
parent aae7cf0b0f
commit 1bfc864193
25 changed files with 154 additions and 714 deletions

View File

@ -15,7 +15,6 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.util.ArrayDeque;

View File

@ -21,7 +21,6 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influence;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
@ -259,19 +258,6 @@ public class ElasticsearchMappings {
.endObject()
.endObject()
// per-partition max probabilities mapping
.startObject(PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName())
.field(TYPE, NESTED)
.startObject(PROPERTIES)
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(PerPartitionMaxProbabilities.MAX_RECORD_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
// Model Plot Output
.startObject(ModelPlot.MODEL_FEATURE.getPreferredName())
.field(TYPE, KEYWORD)

View File

@ -17,7 +17,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
@ -73,7 +72,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.security.support.Exceptions;
@ -388,24 +386,9 @@ public class JobProvider {
searchSourceBuilder.sort(Result.TIMESTAMP.getPreferredName(), query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
}
searchRequest.source(searchSourceBuilder);
searchRequest.indicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
MultiSearchRequest mrequest = new MultiSearchRequest();
mrequest.indicesOptions(addIgnoreUnavailable(mrequest.indicesOptions()));
mrequest.add(searchRequest);
if (Strings.hasLength(query.getPartitionValue())) {
mrequest.add(createPartitionMaxNormailizedProbabilitiesRequest(jobId, query.getStart(), query.getEnd(),
query.getPartitionValue()));
}
client.multiSearch(mrequest, ActionListener.wrap(mresponse -> {
MultiSearchResponse.Item item1 = mresponse.getResponses()[0];
if (item1.isFailure()) {
errorHandler.accept(mapAuthFailure(item1.getFailure(), jobId, GetBucketsAction.NAME));
return;
}
SearchResponse searchResponse = item1.getResponse();
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
SearchHits hits = searchResponse.getHits();
if (query.getTimestamp() != null) {
if (hits.getTotalHits() == 0) {
@ -433,29 +416,15 @@ public class JobProvider {
}
QueryPage<Bucket> buckets = new QueryPage<>(results, searchResponse.getHits().getTotalHits(), Bucket.RESULTS_FIELD);
if (Strings.hasLength(query.getPartitionValue())) {
MultiSearchResponse.Item item2 = mresponse.getResponses()[1];
if (item2.isFailure()) {
errorHandler.accept(item2.getFailure());
return;
}
if (query.isExpand()) {
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
return;
}
if (query.isExpand()) {
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
} else {
if (query.isExpand()) {
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
return;
}
handler.accept(buckets);
}
handler.accept(buckets);
}, errorHandler));
}, e -> { errorHandler.accept(mapAuthFailure(e, jobId, GetBucketsAction.NAME)); }));
}
private void expandBuckets(String jobId, BucketsQuery query, QueryPage<Bucket> buckets, Iterator<Bucket> bucketsToExpand,
@ -470,28 +439,6 @@ public class JobProvider {
}
}
private SearchRequest createPartitionMaxNormailizedProbabilitiesRequest(String jobId, Object epochStart, Object epochEnd,
String partitionFieldValue) {
QueryBuilder timeRangeQuery = new ResultsFilterBuilder()
.timeRange(Result.TIMESTAMP.getPreferredName(), epochStart, epochEnd)
.build();
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(timeRangeQuery)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), PerPartitionMaxProbabilities.RESULT_TYPE_VALUE))
.filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue));
FieldSortBuilder sb = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC);
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.sort(sb);
sourceBuilder.query(boolQuery);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(sourceBuilder);
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
return searchRequest;
}
/**
* Returns a {@link BatchedDocumentsIterator} that allows querying
* and iterating over a large number of buckets of the given job.

View File

@ -36,11 +36,18 @@ import static org.elasticsearch.xpack.ml.job.persistence.ElasticsearchMappings.D
*/
public class JobRenormalizedResultsPersister extends AbstractComponent {
/**
* Execute bulk requests when they reach this size
*/
private static final int BULK_LIMIT = 10000;
private final String jobId;
private final Client client;
private BulkRequest bulkRequest;
public JobRenormalizedResultsPersister(Settings settings, Client client) {
public JobRenormalizedResultsPersister(String jobId, Settings settings, Client client) {
super(settings);
this.jobId = jobId;
this.client = client;
bulkRequest = new BulkRequest();
}
@ -70,6 +77,9 @@ public class JobRenormalizedResultsPersister extends AbstractComponent {
} catch (IOException e) {
logger.error("Error serialising result", e);
}
if (bulkRequest.numberOfActions() >= BULK_LIMIT) {
executeRequest();
}
}
private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
@ -80,10 +90,8 @@ public class JobRenormalizedResultsPersister extends AbstractComponent {
/**
* Execute the bulk action
*
* @param jobId The job Id
*/
public void executeRequest(String jobId) {
public void executeRequest() {
if (bulkRequest.numberOfActions() == 0) {
return;
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.io.IOException;
@ -171,28 +170,6 @@ public class JobResultsPersister extends AbstractComponent {
return this;
}
/**
* Persist {@link PerPartitionMaxProbabilities}
*
* @param partitionProbabilities The probabilities to persist
* @return this
*/
public Builder persistPerPartitionMaxProbabilities(PerPartitionMaxProbabilities partitionProbabilities) {
try (XContentBuilder builder = toXContentBuilder(partitionProbabilities)) {
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {} with ID {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp(),
partitionProbabilities.getId());
bulkRequest.add(
new IndexRequest(indexName, DOC_TYPE, partitionProbabilities.getId()).source(builder));
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error serialising bucket per partition max normalized scores",
new Object[]{jobId}), e);
}
return this;
}
/**
* Execute the bulk action
*/

View File

@ -293,8 +293,8 @@ public class AutodetectProcessManager extends AbstractComponent {
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(),
jobDataCountsPersister);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider,
new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
@ -302,13 +302,12 @@ public class AutodetectProcessManager extends AbstractComponent {
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(),
autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime,
autoDetectExecutorService, () -> setJobState(jobTask, JobState.FAILED));
boolean usePerPartitionNormalization = job.getAnalysisConfig().getUsePerPartitionNormalization();
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
client, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats());
ExecutorService autodetectWorkerExecutor;
try {
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
autoDetectExecutorService.submit(() -> processor.process(process, usePerPartitionNormalization));
autoDetectExecutorService.submit(() -> processor.process(process));
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.

View File

@ -27,7 +27,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import java.time.Duration;
import java.util.Iterator;
@ -40,7 +39,7 @@ import java.util.concurrent.TimeoutException;
/**
* A runnable class that reads the autodetect process output in the
* {@link #process(AutodetectProcess, boolean)} method and persists parsed
* {@link #process(AutodetectProcess)} method and persists parsed
* results via the {@linkplain JobResultsPersister} passed in the constructor.
* <p>
* Has methods to register and remove alert observers.
@ -89,8 +88,8 @@ public class AutoDetectResultProcessor {
this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
}
public void process(AutodetectProcess process, boolean isPerPartitionNormalization) {
Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId));
public void process(AutodetectProcess process) {
Context context = new Context(jobId, persister.bulkPersisterBuilder(jobId));
// If a function call in this throws for some reason we don't want it
// to kill the results reader thread as autodetect will be blocked
@ -175,9 +174,6 @@ public class AutoDetectResultProcessor {
List<AnomalyRecord> records = result.getRecords();
if (records != null && !records.isEmpty()) {
context.bulkResultsPersister.persistRecords(records);
if (context.isPerPartitionNormalization) {
context.bulkResultsPersister.persistPerPartitionMaxProbabilities(new PerPartitionMaxProbabilities(records));
}
}
List<Influencer> influencers = result.getInfluencers();
if (influencers != null && !influencers.isEmpty()) {
@ -306,14 +302,12 @@ public class AutoDetectResultProcessor {
static class Context {
private final String jobId;
private final boolean isPerPartitionNormalization;
private JobResultsPersister.Builder bulkResultsPersister;
boolean deleteInterimRequired;
Context(String jobId, boolean isPerPartitionNormalization, JobResultsPersister.Builder bulkResultsPersister) {
Context(String jobId, JobResultsPersister.Builder bulkResultsPersister) {
this.jobId = jobId;
this.isPerPartitionNormalization = isPerPartitionNormalization;
this.deleteInterimRequired = true;
this.bulkResultsPersister = bulkResultsPersister;
}

View File

@ -31,11 +31,11 @@ abstract class AbstractLeafNormalizable extends Normalizable {
@Override
public final List<Normalizable> getChildren(ChildType type) {
throw new IllegalStateException(getClass().getSimpleName() + " has no children");
throw new UnsupportedOperationException(getClass().getSimpleName() + " has no children");
}
@Override
public final boolean setMaxChildrenScore(ChildType childrenType, double maxScore) {
throw new IllegalStateException(getClass().getSimpleName() + " has no children");
throw new UnsupportedOperationException(getClass().getSimpleName() + " has no children");
}
}

View File

@ -11,24 +11,20 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.BUCKET_INFLUENCER;
import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.PARTITION_SCORE;
import static org.elasticsearch.xpack.ml.job.process.normalizer.Normalizable.ChildType.RECORD;
public class BucketNormalizable extends Normalizable {
private static final List<ChildType> CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, RECORD, PARTITION_SCORE);
private static final List<ChildType> CHILD_TYPES = Arrays.asList(BUCKET_INFLUENCER, PARTITION_SCORE);
private final Bucket bucket;
private List<RecordNormalizable> records = Collections.emptyList();
public BucketNormalizable(Bucket bucket, String indexName) {
super(indexName);
this.bucket = Objects.requireNonNull(bucket);
@ -80,7 +76,7 @@ public class BucketNormalizable extends Normalizable {
@Override
public double getProbability() {
throw new IllegalStateException("Bucket is container only");
throw new UnsupportedOperationException("Bucket is container only");
}
@Override
@ -93,14 +89,6 @@ public class BucketNormalizable extends Normalizable {
bucket.setAnomalyScore(normalizedScore);
}
public List<RecordNormalizable> getRecords() {
return records;
}
public void setRecords(List<RecordNormalizable> records) {
this.records = records;
}
@Override
public List<ChildType> getChildrenTypes() {
return CHILD_TYPES;
@ -124,9 +112,6 @@ public class BucketNormalizable extends Normalizable {
.map(bi -> new BucketInfluencerNormalizable(bi, getOriginatingIndex()))
.collect(Collectors.toList()));
break;
case RECORD:
children.addAll(records);
break;
case PARTITION_SCORE:
children.addAll(bucket.getPartitionScores().stream()
.map(ps -> new PartitionScoreNormalizable(ps, getOriginatingIndex()))
@ -145,7 +130,6 @@ public class BucketNormalizable extends Normalizable {
double oldScore = bucket.getAnomalyScore();
bucket.setAnomalyScore(maxScore);
return maxScore != oldScore;
case RECORD:
case PARTITION_SCORE:
return false;
default:
@ -156,7 +140,7 @@ public class BucketNormalizable extends Normalizable {
@Override
public void setParentScore(double parentScore) {
throw new IllegalStateException("Bucket has no parent");
throw new UnsupportedOperationException("Bucket has no parent");
}
@Override

View File

@ -52,7 +52,7 @@ public class Normalizer {
* normalizer
*/
public void normalize(Integer bucketSpan, boolean perPartitionNormalization,
List<Normalizable> results, String quantilesState) {
List<? extends Normalizable> results, String quantilesState) {
NormalizerProcess process = processFactory.createNormalizerProcess(jobId, quantilesState, bucketSpan,
perPartitionNormalization, executorService);
NormalizerResultHandler resultsHandler = process.createNormalizedResultsHandler();
@ -123,7 +123,7 @@ public class Normalizer {
* Updates the normalized scores on the results.
*/
private void mergeNormalizedScoresIntoResults(List<NormalizerResult> normalizedScores,
List<Normalizable> results) {
List<? extends Normalizable> results) {
Iterator<NormalizerResult> scoresIter = normalizedScores.iterator();
for (Normalizable result : results) {
mergeRecursively(scoresIter, null, false, result);

View File

@ -22,7 +22,7 @@ public class PartitionScoreNormalizable extends AbstractLeafNormalizable {
@Override
public String getId() {
throw new IllegalStateException("PartitionScore has no ID as is should not be persisted outside of the owning bucket");
throw new UnsupportedOperationException("PartitionScore has no ID as it should not be persisted outside of the owning bucket");
}
@Override

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -15,7 +14,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result;
import java.util.ArrayList;
@ -32,9 +30,9 @@ public class ScoresUpdater {
private static final Logger LOGGER = Loggers.getLogger(ScoresUpdater.class);
/**
* Target number of records to renormalize at a time
* Target number of buckets to renormalize at a time
*/
private static final int TARGET_RECORDS_TO_RENORMALIZE = 100000;
private static final int TARGET_BUCKETS_TO_RENORMALIZE = 100000;
// 30 days
private static final long DEFAULT_RENORMALIZATION_WINDOW_MS = 2592000000L;
@ -44,24 +42,22 @@ public class ScoresUpdater {
private static final long SECONDS_IN_DAY = 86400;
private static final long MILLISECONDS_IN_SECOND = 1000;
private final Job job;
private final String jobId;
private final JobProvider jobProvider;
private final JobRenormalizedResultsPersister updatesPersister;
private final NormalizerFactory normalizerFactory;
private int bucketSpan;
private long normalizationWindow;
private boolean perPartitionNormalization;
private volatile boolean shutdown;
public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizedResultsPersister jobRenormalizedResultsPersister,
NormalizerFactory normalizerFactory) {
this.job = job;
jobId = job.getId();
this.jobProvider = Objects.requireNonNull(jobProvider);
updatesPersister = Objects.requireNonNull(jobRenormalizedResultsPersister);
this.normalizerFactory = Objects.requireNonNull(normalizerFactory);
bucketSpan = ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue();
normalizationWindow = getNormalizationWindowOrDefault(job);
perPartitionNormalization = getPerPartitionNormalizationOrDefault(job.getAnalysisConfig());
}
/**
@ -79,162 +75,118 @@ public class ScoresUpdater {
DEFAULT_BUCKETS_IN_RENORMALIZATION_WINDOW * bucketSpan * MILLISECONDS_IN_SECOND);
}
private static boolean getPerPartitionNormalizationOrDefault(AnalysisConfig analysisConfig) {
return (analysisConfig != null) && analysisConfig.getUsePerPartitionNormalization();
}
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records
*/
public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) {
Normalizer normalizer = normalizerFactory.create(job.getId());
Normalizer normalizer = normalizerFactory.create(jobId);
int[] counts = {0, 0};
updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts,
perPartitionNormalization);
updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts);
updateBuckets(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization);
updateRecords(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization);
updateInfluencers(normalizer, quantilesState, endBucketEpochMs, windowExtensionMs, counts, perPartitionNormalization);
LOGGER.debug("[{}] Normalization resulted in: {} updates, {} no-ops", job.getId(), counts[0], counts[1]);
// The updates will have been persisted in batches throughout the renormalization
// process - this call just catches any leftovers
updatesPersister.executeRequest();
LOGGER.debug("[{}] Normalization resulted in: {} updates, {} no-ops", jobId, counts[0], counts[1]);
}
private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
BatchedDocumentsIterator<Result<Bucket>> bucketsIterator =
jobProvider.newBatchedBucketsIterator(job.getId())
jobProvider.newBatchedBucketsIterator(jobId)
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
.includeInterim(false);
// Make a list of buckets to be renormalized.
// This may be shorter than the original list of buckets for two
// reasons:
// 1) We don't bother with buckets that have raw score 0 and no
// records
// 2) We limit the total number of records to be not much more
// than 100000
List<BucketNormalizable> bucketsToRenormalize = new ArrayList<>();
int batchRecordCount = 0;
int skipped = 0;
while (bucketsIterator.hasNext() && shutdown == false) {
// Get a batch of buckets without their records to calculate
// how many buckets can be sensibly retrieved
Deque<Result<Bucket>> buckets = bucketsIterator.next();
if (buckets.isEmpty()) {
LOGGER.debug("[{}] No buckets to renormalize for job", jobId);
break;
}
while (!buckets.isEmpty() && shutdown == false) {
Result<Bucket> current = buckets.removeFirst();
Bucket currentBucket = current.result;
if (currentBucket.isNormalizable()) {
BucketNormalizable bucketNormalizable = new BucketNormalizable(current.result, current.index);
List<RecordNormalizable> recordNormalizables =
bucketRecordsAsNormalizables(currentBucket.getTimestamp().getTime());
batchRecordCount += recordNormalizables.size();
bucketNormalizable.setRecords(recordNormalizables);
bucketsToRenormalize.add(bucketNormalizable);
} else {
++skipped;
}
if (batchRecordCount >= TARGET_RECORDS_TO_RENORMALIZE) {
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState,
batchRecordCount, skipped, counts, perPartitionNormalization);
bucketsToRenormalize = new ArrayList<>();
batchRecordCount = 0;
skipped = 0;
if (current.result.isNormalizable()) {
bucketsToRenormalize.add(new BucketNormalizable(current.result, current.index));
if (bucketsToRenormalize.size() >= TARGET_BUCKETS_TO_RENORMALIZE) {
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization);
bucketsToRenormalize.clear();
}
}
}
}
if (!bucketsToRenormalize.isEmpty()) {
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, batchRecordCount, skipped, counts,
perPartitionNormalization);
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState, counts, perPartitionNormalization);
}
}
private List<RecordNormalizable> bucketRecordsAsNormalizables(long bucketTimeStamp) {
BatchedDocumentsIterator<Result<AnomalyRecord>> recordsIterator = jobProvider.newBatchedRecordsIterator(job.getId())
.timeRange(bucketTimeStamp, bucketTimeStamp + 1)
.includeInterim(false);
List<RecordNormalizable> recordNormalizables = new ArrayList<>();
while (recordsIterator.hasNext() && shutdown == false) {
for (Result<AnomalyRecord> record : recordsIterator.next() ) {
recordNormalizables.add(new RecordNormalizable(record.result, record.index));
}
}
return recordNormalizables;
}
private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionMs) {
return Math.max(0, endEpochMs - normalizationWindow - windowExtensionMs);
}
private void normalizeBuckets(Normalizer normalizer, List<BucketNormalizable> normalizableBuckets,
String quantilesState, int recordCount, int skipped, int[] counts,
boolean perPartitionNormalization) {
if (shutdown) {
return;
String quantilesState, int[] counts, boolean perPartitionNormalization) {
normalizer.normalize(bucketSpan, perPartitionNormalization, normalizableBuckets, quantilesState);
for (BucketNormalizable bucketNormalizable : normalizableBuckets) {
if (bucketNormalizable.hadBigNormalizedUpdate()) {
updatesPersister.updateBucket(bucketNormalizable);
++counts[0];
} else {
++counts[1];
}
}
LOGGER.debug("[{}] Will renormalize a batch of {} buckets with {} records ({} empty buckets skipped)",
job.getId(), normalizableBuckets.size(), recordCount, skipped);
List<Normalizable> asNormalizables = normalizableBuckets.stream().collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
for (BucketNormalizable bn : normalizableBuckets) {
updateSingleBucket(counts, bn);
}
updatesPersister.executeRequest(job.getId());
}
private void updateSingleBucket(int[] counts, BucketNormalizable bucketNormalizable) {
if (bucketNormalizable.hadBigNormalizedUpdate()) {
if (perPartitionNormalization) {
List<AnomalyRecord> anomalyRecords = bucketNormalizable.getRecords().stream()
.map(RecordNormalizable::getRecord).collect(Collectors.toList());
PerPartitionMaxProbabilities ppProbs = new PerPartitionMaxProbabilities(anomalyRecords);
updatesPersister.updateResult(ppProbs.getId(), bucketNormalizable.getOriginatingIndex(), ppProbs);
private void updateRecords(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
BatchedDocumentsIterator<Result<AnomalyRecord>> recordsIterator = jobProvider.newBatchedRecordsIterator(jobId)
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
.includeInterim(false);
while (recordsIterator.hasNext() && shutdown == false) {
Deque<Result<AnomalyRecord>> records = recordsIterator.next();
if (records.isEmpty()) {
LOGGER.debug("[{}] No records to renormalize for job", jobId);
break;
}
updatesPersister.updateBucket(bucketNormalizable);
++counts[0];
} else {
++counts[1];
LOGGER.debug("[{}] Will renormalize a batch of {} records", jobId, records.size());
List<Normalizable> asNormalizables = records.stream()
.map(recordResultIndex -> new RecordNormalizable(recordResultIndex.result, recordResultIndex.index))
.collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
persistChanged(counts, asNormalizables);
}
persistChanged(counts, bucketNormalizable.getRecords());
}
private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts) {
BatchedDocumentsIterator<Result<Influencer>> influencersIterator = jobProvider.newBatchedInfluencersIterator(job.getId())
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
BatchedDocumentsIterator<Result<Influencer>> influencersIterator = jobProvider.newBatchedInfluencersIterator(jobId)
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
.includeInterim(false);
while (influencersIterator.hasNext() && shutdown == false) {
Deque<Result<Influencer>> influencers = influencersIterator.next();
if (influencers.isEmpty()) {
LOGGER.debug("[{}] No influencers to renormalize for job", job.getId());
LOGGER.debug("[{}] No influencers to renormalize for job", jobId);
break;
}
LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size());
LOGGER.debug("[{}] Will renormalize a batch of {} influencers", jobId, influencers.size());
List<Normalizable> asNormalizables = influencers.stream()
.map(influencerResultIndex ->
new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index))
.map(influencerResultIndex -> new InfluencerNormalizable(influencerResultIndex.result, influencerResultIndex.index))
.collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
persistChanged(counts, asNormalizables);
}
updatesPersister.executeRequest(job.getId());
}
private void persistChanged(int[] counts, List<? extends Normalizable> asNormalizables) {
@ -242,7 +194,7 @@ public class ScoresUpdater {
return;
}
List<Normalizable> toUpdate = asNormalizables.stream().filter(n -> n.hadBigNormalizedUpdate()).collect(Collectors.toList());
List<Normalizable> toUpdate = asNormalizables.stream().filter(Normalizable::hadBigNormalizedUpdate).collect(Collectors.toList());
counts[0] += toUpdate.size();
counts[1] += asNormalizables.size() - toUpdate.size();

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
@ -93,7 +94,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
private boolean isInterim;
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>(); // Can't use emptyList as might be appended to
private long processingTimeMs;
private Map<String, Double> perPartitionMaxProbability = Collections.emptyMap();
private List<PartitionScore> partitionScores = Collections.emptyList();
public Bucket(String jobId, Date timestamp, long bucketSpan) {
@ -114,11 +114,9 @@ public class Bucket extends ToXContentToBytes implements Writeable {
this.isInterim = other.isInterim;
this.bucketInfluencers = new ArrayList<>(other.bucketInfluencers);
this.processingTimeMs = other.processingTimeMs;
this.perPartitionMaxProbability = other.perPartitionMaxProbability;
this.partitionScores = new ArrayList<>(other.partitionScores);
}
@SuppressWarnings("unchecked")
public Bucket(StreamInput in) throws IOException {
jobId = in.readString();
timestamp = new Date(in.readLong());
@ -131,7 +129,10 @@ public class Bucket extends ToXContentToBytes implements Writeable {
isInterim = in.readBoolean();
bucketInfluencers = in.readList(BucketInfluencer::new);
processingTimeMs = in.readLong();
perPartitionMaxProbability = (Map<String, Double>) in.readGenericValue();
// bwc for perPartitionMaxProbability
if (in.getVersion().before(Version.V_5_5_0_UNRELEASED)) {
in.readGenericValue();
}
partitionScores = in.readList(PartitionScore::new);
}
@ -148,7 +149,10 @@ public class Bucket extends ToXContentToBytes implements Writeable {
out.writeBoolean(isInterim);
out.writeList(bucketInfluencers);
out.writeLong(processingTimeMs);
out.writeGenericValue(perPartitionMaxProbability);
// bwc for perPartitionMaxProbability
if (out.getVersion().before(Version.V_5_5_0_UNRELEASED)) {
out.writeGenericValue(Collections.emptyMap());
}
out.writeList(partitionScores);
}
@ -290,14 +294,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
partitionScores = Objects.requireNonNull(scores);
}
public Map<String, Double> getPerPartitionMaxProbability() {
return perPartitionMaxProbability;
}
public void setPerPartitionMaxProbability(Map<String, Double> perPartitionMaxProbability) {
this.perPartitionMaxProbability = Objects.requireNonNull(perPartitionMaxProbability);
}
public double partitionInitialAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();
@ -315,7 +311,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, recordCount, records,
isInterim, bucketSpan, bucketInfluencers);
isInterim, bucketSpan, bucketInfluencers, partitionScores, processingTimeMs);
}
/**
@ -338,18 +334,20 @@ public class Bucket extends ToXContentToBytes implements Writeable {
&& (this.recordCount == that.recordCount)
&& (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore)
&& Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim)
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers);
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers)
&& Objects.equals(this.partitionScores, that.partitionScores)
&& (this.processingTimeMs == that.processingTimeMs);
}
/**
* This method encapsulated the logic for whether a bucket should be
* normalized. Buckets that have no records and a score of
* zero should not be normalized as their score will not change and they
* This method encapsulated the logic for whether a bucket should be normalized.
* Buckets that have a zero anomaly score themselves and no partition scores with
* non-zero score should not be normalized as their score will not change and they
* will just add overhead.
*
* @return true if the bucket should be normalized or false otherwise
*/
public boolean isNormalizable() {
return anomalyScore > 0.0 || recordCount > 0;
return anomalyScore > 0.0 || partitionScores.stream().anyMatch(s -> s.getRecordScore() > 0);
}
}

View File

@ -1,276 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collector;
import java.util.stream.Collectors;
/**
* When per-partition normalization is enabled this class represents
* the max anomalous probabilities of each partition per bucket. These values
* calculated from the bucket's anomaly records.
*/
public class PerPartitionMaxProbabilities extends ToXContentToBytes implements Writeable {
/**
* Result type
*/
public static final String RESULT_TYPE_VALUE = "partition_normalized_probs";
/*
* Field Names
*/
public static final ParseField PER_PARTITION_MAX_PROBABILITIES = new ParseField("per_partition_max_probabilities");
public static final ParseField MAX_RECORD_SCORE = new ParseField("max_record_score");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<PerPartitionMaxProbabilities, Void> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a ->
new PerPartitionMaxProbabilities((String) a[0], (Date) a[1], (long) a[2], (List<PartitionProbability>) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return new Date(p.longValue());
} else if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(p.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + p.currentToken() + "] for [" + Result.TIMESTAMP.getPreferredName() + "]");
}, Result.TIMESTAMP, ObjectParser.ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), Bucket.BUCKET_SPAN);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), PartitionProbability.PARSER, PER_PARTITION_MAX_PROBABILITIES);
PARSER.declareString((p, s) -> {}, Result.RESULT_TYPE);
}
private final String jobId;
private final Date timestamp;
private final long bucketSpan;
private final List<PartitionProbability> perPartitionMaxProbabilities;
public PerPartitionMaxProbabilities(String jobId, Date timestamp, long bucketSpan,
List<PartitionProbability> partitionProbabilities) {
this.jobId = jobId;
this.timestamp = timestamp;
this.bucketSpan = bucketSpan;
this.perPartitionMaxProbabilities = partitionProbabilities;
}
public PerPartitionMaxProbabilities(List<AnomalyRecord> records) {
if (records.isEmpty()) {
throw new IllegalArgumentException("PerPartitionMaxProbabilities cannot be created from an empty list of records");
}
this.jobId = records.get(0).getJobId();
this.timestamp = records.get(0).getTimestamp();
this.bucketSpan = records.get(0).getBucketSpan();
this.perPartitionMaxProbabilities = calcMaxRecordScorePerPartition(records);
}
public PerPartitionMaxProbabilities(StreamInput in) throws IOException {
jobId = in.readString();
timestamp = new Date(in.readLong());
bucketSpan = in.readLong();
perPartitionMaxProbabilities = in.readList(PartitionProbability::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(timestamp.getTime());
out.writeLong(bucketSpan);
out.writeList(perPartitionMaxProbabilities);
}
public String getJobId() {
return jobId;
}
public String getId() {
return jobId + "_" + RESULT_TYPE_VALUE + "_" + timestamp.getTime() + "_" + bucketSpan;
}
public Date getTimestamp() {
return timestamp;
}
public List<PartitionProbability> getPerPartitionMaxProbabilities() {
return perPartitionMaxProbabilities;
}
public double getMaxProbabilityForPartition(String partitionValue) {
Optional<PartitionProbability> first =
perPartitionMaxProbabilities.stream().filter(pp -> partitionValue.equals(pp.getPartitionValue())).findFirst();
return first.isPresent() ? first.get().getMaxRecordScore() : 0.0;
}
/**
* Box class for the stream collector function below
*/
private final class DoubleMaxBox {
private double value = 0.0;
DoubleMaxBox() {
}
public void accept(double d) {
if (d > value) {
value = d;
}
}
public DoubleMaxBox combine(DoubleMaxBox other) {
return (this.value > other.value) ? this : other;
}
public Double value() {
return this.value;
}
}
private List<PartitionProbability> calcMaxRecordScorePerPartition(List<AnomalyRecord> anomalyRecords) {
Map<String, Double> maxValueByPartition = anomalyRecords.stream().collect(
Collectors.groupingBy(AnomalyRecord::getPartitionFieldValue,
Collector.of(DoubleMaxBox::new, (m, ar) -> m.accept(ar.getRecordScore()),
DoubleMaxBox::combine, DoubleMaxBox::value)));
List<PartitionProbability> pProbs = new ArrayList<>();
for (Map.Entry<String, Double> entry : maxValueByPartition.entrySet()) {
pProbs.add(new PartitionProbability(entry.getKey(), entry.getValue()));
}
return pProbs;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.dateField(Result.TIMESTAMP.getPreferredName(), Result.TIMESTAMP.getPreferredName() + "_string", timestamp.getTime());
builder.field(Bucket.BUCKET_SPAN.getPreferredName(), bucketSpan);
builder.field(PER_PARTITION_MAX_PROBABILITIES.getPreferredName(), perPartitionMaxProbabilities);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, perPartitionMaxProbabilities, bucketSpan);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof PerPartitionMaxProbabilities == false) {
return false;
}
PerPartitionMaxProbabilities that = (PerPartitionMaxProbabilities) other;
return Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.timestamp, that.timestamp)
&& this.bucketSpan == that.bucketSpan
&& Objects.equals(this.perPartitionMaxProbabilities, that.perPartitionMaxProbabilities);
}
/**
* Class for partitionValue, maxRecordScore pairs
*/
public static class PartitionProbability extends ToXContentToBytes implements Writeable {
public static final ConstructingObjectParser<PartitionProbability, Void> PARSER =
new ConstructingObjectParser<>("partitionProbability",
a -> new PartitionProbability((String) a[0], (double) a[1]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_VALUE);
PARSER.declareDouble(ConstructingObjectParser.constructorArg(), MAX_RECORD_SCORE);
}
private final String partitionValue;
private final double maxRecordScore;
PartitionProbability(String partitionName, double maxRecordScore) {
this.partitionValue = partitionName;
this.maxRecordScore = maxRecordScore;
}
public PartitionProbability(StreamInput in) throws IOException {
partitionValue = in.readString();
maxRecordScore = in.readDouble();
}
public String getPartitionValue() {
return partitionValue;
}
public double getMaxRecordScore() {
return maxRecordScore;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(partitionValue);
out.writeDouble(maxRecordScore);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionValue)
.field(MAX_RECORD_SCORE.getPreferredName(), maxRecordScore)
.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(partitionValue, maxRecordScore);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof PartitionProbability == false) {
return false;
}
PartitionProbability that = (PartitionProbability) other;
return Objects.equals(this.partitionValue, that.partitionValue)
&& this.maxRecordScore == that.maxRecordScore;
}
}
}

View File

@ -141,9 +141,6 @@ public final class ReservedFieldNames {
ModelSnapshot.LATEST_RESULT_TIME.getPreferredName(),
ModelSnapshot.RETAIN.getPreferredName(),
PerPartitionMaxProbabilities.PER_PARTITION_MAX_PROBABILITIES.getPreferredName(),
PerPartitionMaxProbabilities.MAX_RECORD_SCORE.getPreferredName(),
Result.RESULT_TYPE.getPreferredName(),
Result.TIMESTAMP.getPreferredName(),
Result.IS_INTERIM.getPreferredName()

View File

@ -16,9 +16,7 @@ import org.elasticsearch.xpack.ml.support.AbstractStreamableTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GetBucketActionResponseTests extends AbstractStreamableTestCase<GetBucketsAction.Response> {
@ -64,14 +62,6 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase<Get
}
bucket.setPartitionScores(partitionScores);
}
if (randomBoolean()) {
int size = randomInt(10);
Map<String, Double> perPartitionMaxProbability = new HashMap<>(size);
for (int i = 0; i < size; i++) {
perPartitionMaxProbability.put(randomAlphaOfLengthBetween(1, 20), randomDouble());
}
bucket.setPerPartitionMaxProbability(perPartitionMaxProbability);
}
if (randomBoolean()) {
bucket.setProcessingTimeMs(randomLong());
}

View File

@ -129,7 +129,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
Quantiles quantiles = createQuantiles();
builder.addQuantiles(quantiles);
resultProcessor.process(builder.buildTestProcess(), false);
resultProcessor.process(builder.buildTestProcess());
jobResultsPersister.commitResultWrites(JOB_ID);
BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build();
@ -179,7 +179,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
.addFlushAcknowledgement(createFlushAcknowledgement())
.addBucket(nonInterimBucket); // and this will delete the interim results
resultProcessor.process(resultBuilder.buildTestProcess(), false);
resultProcessor.process(resultBuilder.buildTestProcess());
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
@ -212,7 +212,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
.addRecords(finalAnomalyRecords)
.addBucket(finalBucket); // this deletes the previous interim and persists final bucket & records
resultProcessor.process(resultBuilder.buildTestProcess(), false);
resultProcessor.process(resultBuilder.buildTestProcess());
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
@ -237,7 +237,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
.addBucket(bucket) // bucket triggers persistence
.addRecords(secondSetOfRecords);
resultProcessor.process(resultBuilder.buildTestProcess(), false);
resultProcessor.process(resultBuilder.buildTestProcess());
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());

View File

@ -33,7 +33,7 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase {
BucketNormalizable bn = createBucketNormalizable();
JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister();
persister.updateBucket(bn);
persister.executeRequest("foo");
persister.executeRequest();
assertEquals(0, persister.getBulkRequest().numberOfActions());
}
@ -42,7 +42,7 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase {
when(bulkResponse.hasFailures()).thenReturn(false);
Client client = new MockClientBuilder("cluster").bulk(bulkResponse).build();
return new JobRenormalizedResultsPersister(Settings.EMPTY, client);
return new JobRenormalizedResultsPersister("foo", Settings.EMPTY, client);
}
private BucketNormalizable createBucketNormalizable() {
@ -52,4 +52,4 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase {
bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1));
return new BucketNormalizable(bucket, "foo-index");
}
}
}

View File

@ -22,7 +22,6 @@ import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.junit.Before;
import org.mockito.InOrder;
@ -72,7 +71,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
processorUnderTest.process(process, randomBoolean());
processorUnderTest.process(process);
verify(renormalizer, times(1)).waitUntilIdle();
assertEquals(0, processorUnderTest.completionLatch.getCount());
}
@ -82,7 +81,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
@ -100,7 +99,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = true;
AutodetectResult result = mock(AutodetectResult.class);
Bucket bucket = mock(Bucket.class);
@ -118,7 +117,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123);
@ -136,7 +135,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123);
@ -147,7 +146,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getRecords()).thenReturn(records);
processorUnderTest.processResult(context, result);
verify(bulkBuilder, times(1)).persistPerPartitionMaxProbabilities(any(PerPartitionMaxProbabilities.class));
verify(bulkBuilder, times(1)).persistRecords(records);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister);
@ -157,7 +155,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123);
@ -175,7 +173,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
@ -191,7 +189,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
@ -209,7 +207,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
@ -232,7 +230,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelPlot() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelPlot modelPlot = mock(ModelPlot.class);
@ -246,7 +244,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelSizeStats() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
@ -261,7 +259,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_modelSnapshot() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
@ -281,7 +279,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testProcessResult_quantiles() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
@ -304,7 +302,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(iterator.next()).thenReturn(autodetectResult);
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
processorUnderTest.process(process, randomBoolean());
processorUnderTest.process(process);
processorUnderTest.awaitCompletion();
assertEquals(0, processorUnderTest.completionLatch.getCount());
@ -325,7 +323,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSizeStats(any());
processorUnderTest.process(process, randomBoolean());
processorUnderTest.process(process);
verify(persister, times(2)).persistModelSizeStats(any());
}
@ -338,7 +336,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
processorUnderTest.setProcessKilled();
processorUnderTest.process(process, randomBoolean());
processorUnderTest.process(process);
processorUnderTest.awaitCompletion();
assertEquals(0, processorUnderTest.completionLatch.getCount());

View File

@ -77,7 +77,7 @@ public class BucketInfluencerNormalizableTests extends ESTestCase {
}
public void testGetChildren_ByType() {
expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME)
expectThrows(UnsupportedOperationException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME)
.getChildren(Normalizable.ChildType.BUCKET_INFLUENCER));
}
@ -86,7 +86,7 @@ public class BucketInfluencerNormalizableTests extends ESTestCase {
}
public void testSetMaxChildrenScore() {
expectThrows(IllegalStateException.class,
expectThrows(UnsupportedOperationException.class,
() -> new BucketInfluencerNormalizable(bucketInfluencer, INDEX_NAME)
.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0));
}

View File

@ -9,7 +9,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
@ -83,7 +82,7 @@ public class BucketNormalizableTests extends ESTestCase {
}
public void testGetProbability() {
expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).getProbability());
expectThrows(UnsupportedOperationException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).getProbability());
}
public void testGetNormalizedScore() {
@ -101,23 +100,17 @@ public class BucketNormalizableTests extends ESTestCase {
public void testGetChildren() {
BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME);
bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME))
.collect(Collectors.toList()));
List<Normalizable> children = bn.getChildren();
assertEquals(6, children.size());
assertEquals(4, children.size());
assertTrue(children.get(0) instanceof BucketInfluencerNormalizable);
assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON);
assertTrue(children.get(1) instanceof BucketInfluencerNormalizable);
assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON);
assertTrue(children.get(2) instanceof RecordNormalizable);
assertEquals(1.0, children.get(2).getNormalizedScore(), EPSILON);
assertTrue(children.get(3) instanceof RecordNormalizable);
assertEquals(2.0, children.get(3).getNormalizedScore(), EPSILON);
assertTrue(children.get(4) instanceof PartitionScoreNormalizable);
assertEquals(0.2, children.get(4).getNormalizedScore(), EPSILON);
assertTrue(children.get(5) instanceof PartitionScoreNormalizable);
assertEquals(0.4, children.get(5).getNormalizedScore(), EPSILON);
assertTrue(children.get(2) instanceof PartitionScoreNormalizable);
assertEquals(0.2, children.get(2).getNormalizedScore(), EPSILON);
assertTrue(children.get(3) instanceof PartitionScoreNormalizable);
assertEquals(0.4, children.get(3).getNormalizedScore(), EPSILON);
}
public void testGetChildren_GivenTypeBucketInfluencer() {
@ -131,24 +124,11 @@ public class BucketNormalizableTests extends ESTestCase {
assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON);
}
public void testGetChildren_GivenTypeRecord() {
BucketNormalizable bn = new BucketNormalizable(bucket, INDEX_NAME);
bn.setRecords(bucket.getRecords().stream().map(r -> new RecordNormalizable(r, INDEX_NAME))
.collect(Collectors.toList()));
List<Normalizable> children = bn.getChildren(Normalizable.ChildType.RECORD);
assertEquals(2, children.size());
assertTrue(children.get(0) instanceof RecordNormalizable);
assertEquals(1.0, children.get(0).getNormalizedScore(), EPSILON);
assertTrue(children.get(1) instanceof RecordNormalizable);
assertEquals(2.0, children.get(1).getNormalizedScore(), EPSILON);
}
public void testSetMaxChildrenScore_GivenDifferentScores() {
BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME);
assertTrue(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 95.0));
assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 42.0));
assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 42.0));
assertEquals(95.0, bucket.getAnomalyScore(), EPSILON);
}
@ -157,13 +137,13 @@ public class BucketNormalizableTests extends ESTestCase {
BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket, INDEX_NAME);
assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 88.0));
assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.RECORD, 2.0));
assertFalse(bucketNormalizable.setMaxChildrenScore(Normalizable.ChildType.PARTITION_SCORE, 2.0));
assertEquals(88.0, bucket.getAnomalyScore(), EPSILON);
}
public void testSetParentScore() {
expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).setParentScore(42.0));
expectThrows(UnsupportedOperationException.class, () -> new BucketNormalizable(bucket, INDEX_NAME).setParentScore(42.0));
}
public void testResetBigChangeFlag() {

View File

@ -74,7 +74,7 @@ public class InfluencerNormalizableTests extends ESTestCase {
}
public void testGetChildren_ByType() {
expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME)
expectThrows(UnsupportedOperationException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME)
.getChildren(Normalizable.ChildType.BUCKET_INFLUENCER));
}
@ -83,7 +83,7 @@ public class InfluencerNormalizableTests extends ESTestCase {
}
public void testSetMaxChildrenScore() {
expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME)
expectThrows(UnsupportedOperationException.class, () -> new InfluencerNormalizable(influencer, INDEX_NAME)
.setMaxChildrenScore(Normalizable.ChildType.BUCKET_INFLUENCER, 42.0));
}

View File

@ -83,6 +83,7 @@ public class ScoresUpdaterTests extends ESTestCase {
scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory);
givenProviderReturnsNoBuckets();
givenProviderReturnsNoRecords();
givenProviderReturnsNoInfluencers();
givenNormalizerFactoryReturnsMock();
givenNormalizerRaisesBigChangeFlag();
@ -153,10 +154,10 @@ public class ScoresUpdaterTests extends ESTestCase {
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyNormalizerWasInvoked(2);
verify(jobRenormalizedResultsPersister, times(1)).updateBucket(any());
verify(jobRenormalizedResultsPersister, times(1)).updateResults(any());
verify(jobRenormalizedResultsPersister, times(2)).executeRequest(anyString());
verify(jobRenormalizedResultsPersister, times(1)).executeRequest();
}
public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException {
@ -229,7 +230,7 @@ public class ScoresUpdaterTests extends ESTestCase {
verifyNormalizerWasInvoked(1);
verify(jobRenormalizedResultsPersister, times(1)).updateResults(any());
verify(jobRenormalizedResultsPersister, times(1)).executeRequest(anyString());
verify(jobRenormalizedResultsPersister, times(1)).executeRequest();
}
public void testUpdate_GivenShutdown() throws IOException {
@ -381,6 +382,10 @@ public class ScoresUpdaterTests extends ESTestCase {
when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter);
}
private void givenProviderReturnsNoRecords() {
givenProviderReturnsRecords(new ArrayDeque<>());
}
private void givenProviderReturnsRecords(Deque<AnomalyRecord> records) {
Deque<Result<AnomalyRecord>> batch = new ArrayDeque<>();
List<Deque<Result<AnomalyRecord>>> batches = new ArrayList<>();

View File

@ -13,9 +13,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BucketTests extends AbstractSerializingTestCase<Bucket> {
@ -61,14 +59,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
}
bucket.setPartitionScores(partitionScores);
}
if (randomBoolean()) {
int size = randomInt(10);
Map<String, Double> perPartitionMaxProbability = new HashMap<>(size);
for (int i = 0; i < size; i++) {
perPartitionMaxProbability.put(randomAlphaOfLengthBetween(1, 20), randomDouble());
}
bucket.setPerPartitionMaxProbability(perPartitionMaxProbability);
}
if (randomBoolean()) {
bucket.setProcessingTimeMs(randomLong());
}
@ -244,11 +234,11 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
assertFalse(bucket.isNormalizable());
}
public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() {
public void testIsNormalizable_GivenAnomalyScoreIsZeroAndPartitionsScoresAreNonZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123));
bucket.setAnomalyScore(0.0);
bucket.setRecordCount(1);
bucket.setPartitionScores(Collections.singletonList(new PartitionScore("n", "v", 50.0, 40.0, 0.01)));
assertTrue(bucket.isNormalizable());
}

View File

@ -1,88 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.results;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCase<PerPartitionMaxProbabilities> {
@Override
protected PerPartitionMaxProbabilities createTestInstance() {
int num = randomIntBetween(1, 10);
List<PerPartitionMaxProbabilities.PartitionProbability> pps = new ArrayList<>();
for (int i=0; i<num; i++) {
pps.add(new PerPartitionMaxProbabilities.PartitionProbability(randomAlphaOfLength(12), randomDouble()));
}
return new PerPartitionMaxProbabilities(randomAlphaOfLength(20), new DateTime(randomDateTimeZone()).toDate(),
randomNonNegativeLong(), pps);
}
@Override
protected Writeable.Reader<PerPartitionMaxProbabilities> instanceReader() {
return PerPartitionMaxProbabilities::new;
}
@Override
protected PerPartitionMaxProbabilities parseInstance(XContentParser parser) {
return PerPartitionMaxProbabilities.PARSER.apply(parser, null);
}
public void testCreateFromAListOfRecords() {
List<AnomalyRecord> records = new ArrayList<>();
records.add(createAnomalyRecord("A", 20.0));
records.add(createAnomalyRecord("A", 40.0));
records.add(createAnomalyRecord("B", 90.0));
records.add(createAnomalyRecord("B", 15.0));
records.add(createAnomalyRecord("B", 45.0));
PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities(records);
List<PerPartitionMaxProbabilities.PartitionProbability> pProbs = ppMax.getPerPartitionMaxProbabilities();
assertEquals(2, pProbs.size());
for (PerPartitionMaxProbabilities.PartitionProbability pProb : pProbs) {
if (pProb.getPartitionValue().equals("A")) {
assertEquals(40.0, pProb.getMaxRecordScore(), 0.0001);
} else {
assertEquals(90.0, pProb.getMaxRecordScore(), 0.0001);
}
}
}
public void testMaxProbabilityForPartition() {
List<AnomalyRecord> records = new ArrayList<>();
records.add(createAnomalyRecord("A", 20.0));
records.add(createAnomalyRecord("A", 40.0));
records.add(createAnomalyRecord("B", 90.0));
records.add(createAnomalyRecord("B", 15.0));
records.add(createAnomalyRecord("B", 45.0));
PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities(records);
assertEquals(40.0, ppMax.getMaxProbabilityForPartition("A"), 0.0001);
assertEquals(90.0, ppMax.getMaxProbabilityForPartition("B"), 0.0001);
}
public void testId() {
PerPartitionMaxProbabilities ppMax = new PerPartitionMaxProbabilities("job-foo", new Date(100L), 300L, Collections.emptyList());
assertEquals("job-foo_partition_normalized_probs_100_300", ppMax.getId());
}
private AnomalyRecord createAnomalyRecord(String partitionFieldValue, double recordScore) {
AnomalyRecord record = new AnomalyRecord("foo", new Date(), 600);
record.setPartitionFieldValue(partitionFieldValue);
record.setRecordScore(recordScore);
return record;
}
}