Add back parts of the normalizer functionality (elastic/elasticsearch#526)

* Added back Normalizable classes
* Added back normalization process management classes
* Added back the scores updater

Original commit: elastic/x-pack-elasticsearch@ac8edf6ed6
This commit is contained in:
David Roberts 2016-12-12 12:40:13 +00:00 committed by GitHub
parent bad8a2beb5
commit 1d4df3903c
57 changed files with 2950 additions and 200 deletions

View File

@ -69,6 +69,9 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.BlackHoleAutodetec
import org.elasticsearch.xpack.prelert.job.process.autodetect.NativeAutodetectProcessFactory;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.prelert.job.process.normalizer.MultiplyingNormalizerProcess;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.prelert.job.scheduler.http.HttpDataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
@ -153,21 +156,27 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService);
AutodetectProcessFactory processFactory;
AutodetectProcessFactory autodetectProcessFactory;
NormalizerProcessFactory normalizerProcessFactory;
if (USE_NATIVE_PROCESS_OPTION.get(settings)) {
try {
NativeController nativeController = new NativeController(env, new NamedPipeHelper());
nativeController.tailLogsInThread();
processFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController);
autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController);
normalizerProcessFactory = new NativeNormalizerProcessFactory(env, settings, nativeController);
} catch (IOException e) {
throw new ElasticsearchException("Failed to create native process factory", e);
throw new ElasticsearchException("Failed to create native process factories", e);
}
} else {
processFactory = (JobDetails, ignoreDowntime, executorService) -> new BlackHoleAutodetectProcess();
autodetectProcessFactory = (jobDetails, ignoreDowntime, executorService) -> new BlackHoleAutodetectProcess();
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization,
executorService) -> new MultiplyingNormalizerProcess(settings, 1.0);
}
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory, clusterService.getClusterSettings());
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, autodetectProcessFactory,
clusterService.getClusterSettings());
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client),

View File

@ -35,7 +35,7 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectR
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.prelert.job.process.normalizer.noop.NoOpRenormaliser;
import org.elasticsearch.xpack.prelert.job.process.normalizer.noop.NoOpRenormalizer;
import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -175,7 +175,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
UsageReporter usageReporter = new UsageReporter(settings, job.getId(), usagePersister);
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getId(), jobProvider.dataCounts(jobId),
usageReporter, jobDataCountsPersister)) {
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), jobResultsPersister, parser);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormalizer(), jobResultsPersister, parser);
AutodetectProcess process = null;
try {

View File

@ -125,9 +125,9 @@ public final class Messages {
public static final String JOB_CONFIG_OVERFIELD_NEEDS_ANOTHER = "job.config.overField.needs.another";
public static final String JOB_CONFIG_MULTIPLE_BUCKETSPANS_REQUIRE_BUCKETSPAN = "job.config.multiple.bucketspans.require.bucket_span";
public static final String JOB_CONFIG_MULTIPLE_BUCKETSPANS_MUST_BE_MULTIPLE = "job.config.multiple.bucketspans.must.be.multiple";
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD = "job.config.per.partition.normalisation."
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_REQUIRES_PARTITION_FIELD = "job.config.per.partition.normalization."
+ "requires.partition.field";
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS = "job.config.per.partition.normalisation."
public static final String JOB_CONFIG_PER_PARTITION_NORMALIZATION_CANNOT_USE_INFLUENCERS = "job.config.per.partition.normalization."
+ "cannot.use.influencers";

View File

@ -15,8 +15,8 @@ import java.util.Objects;
* <li>Timestamp (Required) - Timestamp of the bucket</li>
* <li>Expand- Include anomaly records. Default= false</li>
* <li>IncludeInterim- Include interim results. Default = false</li>
* <li>partitionValue Set the bucket's max normalised probabiltiy to this
* partiton field value's max normalised probability. Default = null</li>
* <li>partitionValue Set the bucket's max normalized probabiltiy to this
* partiton field value's max normalized probability. Default = null</li>
* </ul>
*/
public final class BucketQueryBuilder {

View File

@ -29,8 +29,8 @@ import java.util.Objects;
* <li>end- The end bucket timestamp buckets up to but NOT including this
* timestamp are returned. If 0 all buckets from <code>startEpochMs</code> are
* returned. Default = -1</li>
* <li>partitionValue Set the bucket's max normalised probability to this
* partition field value's max normalised probability. Default = null</li>
* <li>partitionValue Set the bucket's max normalized probability to this
* partition field value's max normalized probability. Default = null</li>
* </ul>
*/
public final class BucketsQueryBuilder {

View File

@ -25,8 +25,8 @@ import java.util.Objects;
* <li>end- The end influencer timestamp. Influencers up to but NOT including this
* timestamp are returned. If 0 all influencers from <code>start</code> are
* returned. Default = -1</li>
* <li>partitionValue Set the bucket's max normalised probability to this
* partition field value's max normalised probability. Default = null</li>
* <li>partitionValue Set the bucket's max normalized probability to this
* partition field value's max normalized probability. Default = null</li>
* </ul>
*/
public final class InfluencersQueryBuilder {

View File

@ -299,7 +299,7 @@ public class JobProvider {
}
} else {
List<PerPartitionMaxProbabilities> scores =
partitionMaxNormalisedProbabilities(jobId, query.getStart(), query.getEnd(), query.getPartitionValue());
partitionMaxNormalizedProbabilities(jobId, query.getStart(), query.getEnd(), query.getPartitionValue());
mergePartitionScoresIntoBucket(scores, buckets.results(), query.getPartitionValue());
@ -432,7 +432,7 @@ public class JobProvider {
}
} else {
List<PerPartitionMaxProbabilities> partitionProbs =
partitionMaxNormalisedProbabilities(jobId, query.getTimestamp(), query.getTimestamp() + 1, query.getPartitionValue());
partitionMaxNormalizedProbabilities(jobId, query.getTimestamp(), query.getTimestamp() + 1, query.getPartitionValue());
if (partitionProbs.size() > 1) {
LOGGER.error("Found more than one PerPartitionMaxProbabilities with timestamp [" + query.getTimestamp() + "]" +
@ -454,7 +454,7 @@ public class JobProvider {
}
private List<PerPartitionMaxProbabilities> partitionMaxNormalisedProbabilities(String jobId, Object epochStart, Object epochEnd,
private List<PerPartitionMaxProbabilities> partitionMaxNormalizedProbabilities(String jobId, Object epochStart, Object epochEnd,
String partitionFieldValue)
throws ResourceNotFoundException {
QueryBuilder timeRangeQuery = new ResultsFilterBuilder()

View File

@ -17,24 +17,24 @@ import java.util.List;
/**
* Interface for classes that update {@linkplain Bucket Buckets}
* for a particular job with new normalised anomaly scores and
* for a particular job with new normalized anomaly scores and
* unusual scores.
*
* Renormalised results already have an ID having been indexed at least
* Renormalized results already have an ID having been indexed at least
* once before that same ID should be used on persistence
*/
public class JobRenormaliser extends AbstractComponent {
public class JobRenormalizer extends AbstractComponent {
private final JobResultsPersister jobResultsPersister;
public JobRenormaliser(Settings settings, JobResultsPersister jobResultsPersister) {
public JobRenormalizer(Settings settings, JobResultsPersister jobResultsPersister) {
super(settings);
this.jobResultsPersister = jobResultsPersister;
}
/**
* Update the bucket with the changes that may result
* due to renormalisation.
* due to renormalization.
*
* @param bucket the bucket to update
*/
@ -59,10 +59,9 @@ public class JobRenormaliser extends AbstractComponent {
* with the given ID.
*
* @param jobId Id of the job to update
* @param documentId The ID the {@link PerPartitionMaxProbabilities} document should be persisted with
* @param records Source of the new {@link PerPartitionMaxProbabilities} object
*/
public void updatePerPartitionMaxProbabilities(String jobId, String documentId, List<AnomalyRecord> records) {
public void updatePerPartitionMaxProbabilities(String jobId, List<AnomalyRecord> records) {
PerPartitionMaxProbabilities ppMaxProbs = new PerPartitionMaxProbabilities(records);
jobResultsPersister.bulkPersisterBuilder(jobId).persistPerPartitionMaxProbabilities(ppMaxProbs).executeRequest();
}

View File

@ -44,7 +44,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
* <b>Anomaly Record</b> Each record was generated by a detector which can be identified via
* the detectorIndex field.
* <b>Influencers</b>
* <b>Quantiles</b> may contain model quantiles used in normalisation and are
* <b>Quantiles</b> may contain model quantiles used in normalization and are
* stored in documents of type {@link Quantiles#TYPE} <br>
* <b>ModelSizeStats</b> This is stored in a flat structure <br>
* <b>ModelSnapShot</b> This is stored in a flat structure <br>

View File

@ -41,7 +41,7 @@ public class ProcessCtrl {
static final String AUTODETECT_PATH = "./" + AUTODETECT;
/**
* The normalisation native program name - always loaded from the same directory as the controller process
* The normalization native program name - always loaded from the same directory as the controller process
*/
public static final String NORMALIZE = "normalize";
static final String NORMALIZE_PATH = "./" + NORMALIZE;
@ -224,7 +224,7 @@ public class ProcessCtrl {
command.add(IGNORE_DOWNTIME_ARG);
}
if (ProcessCtrl.modelConfigFilePresent(env)) {
if (modelConfigFilePresent(env)) {
String modelConfigFile = PrelertPlugin.resolveConfigFile(env, PRELERT_MODEL_CONF).toString();
command.add(MODEL_CONFIG_ARG + modelConfigFile);
}
@ -258,7 +258,7 @@ public class ProcessCtrl {
/**
* Build the command to start the normalizer process.
*/
public static List<String> buildNormaliserCommand(Environment env, String jobId, String quantilesState, Integer bucketSpan,
public static List<String> buildNormalizerCommand(Environment env, String jobId, String quantilesState, Integer bucketSpan,
boolean perPartitionNormalization, long controllerPid) throws IOException {
List<String> command = new ArrayList<>();
@ -271,7 +271,7 @@ public class ProcessCtrl {
}
if (quantilesState != null) {
Path quantilesStateFilePath = writeNormaliserInitState(jobId, quantilesState, env);
Path quantilesStateFilePath = writeNormalizerInitState(jobId, quantilesState, env);
String stateFileArg = QUANTILES_STATE_PATH_ARG + quantilesStateFilePath;
command.add(stateFileArg);
@ -287,9 +287,9 @@ public class ProcessCtrl {
}
/**
* Write the normaliser init state to file.
* Write the normalizer init state to file.
*/
public static Path writeNormaliserInitState(String jobId, String state, Environment env)
public static Path writeNormalizerInitState(String jobId, String state, Environment env)
throws IOException {
// createTempFile has a race condition where it may return the same
// temporary file name to different threads if called simultaneously

View File

@ -92,7 +92,7 @@ public class AutodetectBuilder {
}
/**
* Set quantiles to restore the normaliser state if any.
* Set quantiles to restore the normalizer state if any.
*
* @param quantiles the non-null quantiles
*/
@ -161,10 +161,10 @@ public class AutodetectBuilder {
Quantiles quantiles = this.quantiles.get();
logger.info("Restoring quantiles for job '" + job.getId() + "'");
Path normalisersStateFilePath = ProcessCtrl.writeNormaliserInitState(
Path normalizersStateFilePath = ProcessCtrl.writeNormalizerInitState(
job.getId(), quantiles.getQuantileState(), env);
String quantilesStateFileArg = ProcessCtrl.QUANTILES_STATE_PATH_ARG + normalisersStateFilePath;
String quantilesStateFileArg = ProcessCtrl.QUANTILES_STATE_PATH_ARG + normalizersStateFilePath;
command.add(quantilesStateFileArg);
command.add(ProcessCtrl.DELETE_STATE_FILES_ARG);
}

View File

@ -120,9 +120,9 @@ public class AutodetectCommunicator implements Closeable {
throw ExceptionsHelper.serverError(msg);
}
// We also have to wait for the normaliser to become idle so that we block
// clients from querying results in the middle of normalisation.
autoDetectResultProcessor.waitUntilRenormaliserIsIdle();
// We also have to wait for the normalizer to become idle so that we block
// clients from querying results in the middle of normalization.
autoDetectResultProcessor.waitUntilRenormalizerIsIdle();
return null;
}, false);
}

View File

@ -26,7 +26,7 @@ public interface AutodetectProcess extends Closeable {
* encode the record appropriately
* @throws IOException If the write failed
*/
void writeRecord(String [] record) throws IOException;
void writeRecord(String[] record) throws IOException;
/**
* Write the reset buckets control message

View File

@ -45,7 +45,7 @@ class NativeAutodetectProcess implements AutodetectProcess {
private final ZonedDateTime startTime;
private final int numberOfAnalysisFields;
private final List<Path> filesToDelete;
private Future<?> logTailThread;
private Future<?> logTailFuture;
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
InputStream persistStream, int numberOfAnalysisFields, List<Path> filesToDelete,
@ -59,7 +59,7 @@ class NativeAutodetectProcess implements AutodetectProcess {
startTime = ZonedDateTime.now();
this.numberOfAnalysisFields = numberOfAnalysisFields;
this.filesToDelete = filesToDelete;
logTailThread = executorService.submit(() -> {
logTailFuture = executorService.submit(() -> {
try (CppLogMessageHandler h = cppLogHandler) {
h.tailStream();
} catch (IOException e) {
@ -104,15 +104,16 @@ class NativeAutodetectProcess implements AutodetectProcess {
processInStream.close();
// wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger
// this may take a long time as it persists the model state
logTailThread.get(30, TimeUnit.MINUTES);
logTailFuture.get(30, TimeUnit.MINUTES);
if (cppLogHandler.seenFatalError()) {
throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
}
LOGGER.info("[{}] Process exited", jobId);
LOGGER.info("[{}] Autodetect process exited", jobId);
} catch (ExecutionException | TimeoutException e) {
LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running native process", new Object[] { jobId }, e));
LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running autodetect process",
new Object[] { jobId }, e));
} catch (InterruptedException e) {
LOGGER.warn("[{}] Exception closing the running native process", jobId);
LOGGER.warn("[{}] Exception closing the running autodetect process", jobId);
Thread.currentThread().interrupt();
} finally {
deleteAssociatedFiles();

View File

@ -111,7 +111,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
autodetectBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
} catch (IOException | TimeoutException e) {
String msg = "Failed to launch process for job " + job.getId();
String msg = "Failed to launch autodetect for job " + job.getId();
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
}

View File

@ -12,7 +12,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
@ -51,7 +51,7 @@ public class AutoDetectResultProcessor {
private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class);
private final Renormaliser renormaliser;
private final Renormalizer renormalizer;
private final JobResultsPersister persister;
private final AutodetectResultsParser parser;
@ -60,23 +60,23 @@ public class AutoDetectResultProcessor {
private volatile ModelSizeStats latestModelSizeStats;
public AutoDetectResultProcessor(Renormaliser renormaliser, JobResultsPersister persister, AutodetectResultsParser parser) {
this(renormaliser, persister, parser, new FlushListener());
public AutoDetectResultProcessor(Renormalizer renormalizer, JobResultsPersister persister, AutodetectResultsParser parser) {
this(renormalizer, persister, parser, new FlushListener());
}
AutoDetectResultProcessor(Renormaliser renormaliser, JobResultsPersister persister, AutodetectResultsParser parser,
AutoDetectResultProcessor(Renormalizer renormalizer, JobResultsPersister persister, AutodetectResultsParser parser,
FlushListener flushListener) {
this.renormaliser = renormaliser;
this.renormalizer = renormalizer;
this.persister = persister;
this.parser = parser;
this.flushListener = flushListener;
}
public void process(String jobId, InputStream in, boolean isPerPartitionNormalisation) {
public void process(String jobId, InputStream in, boolean isPerPartitionNormalization) {
try (Stream<AutodetectResult> stream = parser.parseResults(in)) {
int bucketCount = 0;
Iterator<AutodetectResult> iterator = stream.iterator();
Context context = new Context(jobId, isPerPartitionNormalisation, persister.bulkPersisterBuilder(jobId));
Context context = new Context(jobId, isPerPartitionNormalization, persister.bulkPersisterBuilder(jobId));
while (iterator.hasNext()) {
AutodetectResult result = iterator.next();
processResult(context, result);
@ -92,7 +92,7 @@ public class AutoDetectResultProcessor {
} finally {
completionLatch.countDown();
flushListener.clear();
renormaliser.shutdown();
renormalizer.shutdown();
}
}
@ -151,11 +151,11 @@ public class AutoDetectResultProcessor {
if (quantiles != null) {
persister.persistQuantiles(quantiles);
LOGGER.debug("[{}] Quantiles parsed from output - will " + "trigger renormalisation of scores", context.jobId);
LOGGER.debug("[{}] Quantiles parsed from output - will " + "trigger renormalization of scores", context.jobId);
if (context.isPerPartitionNormalization) {
renormaliser.renormaliseWithPartition(quantiles);
renormalizer.renormalizeWithPartition(quantiles);
} else {
renormaliser.renormalise(quantiles);
renormalizer.renormalize(quantiles);
}
}
FlushAcknowledgement flushAcknowledgement = result.getFlushAcknowledgement();
@ -193,8 +193,8 @@ public class AutoDetectResultProcessor {
return flushListener.waitForFlush(flushId, timeout.toMillis());
}
public void waitUntilRenormaliserIsIdle() {
renormaliser.waitUntilIdle();
public void waitUntilRenormalizerIsIdle() {
renormalizer.waitUntilIdle();
}
static class Context {

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import java.util.Collections;
import java.util.List;
abstract class AbstractLeafNormalizable implements Normalizable {
@Override
public final boolean isContainerOnly() {
return false;
}
@Override
public final List<Integer> getChildrenTypes() {
return Collections.emptyList();
}
@Override
public final List<Normalizable> getChildren() {
return Collections.emptyList();
}
@Override
public final List<Normalizable> getChildren(int type) {
throw new IllegalStateException(getClass().getSimpleName() + " has no children");
}
@Override
public final boolean setMaxChildrenScore(int childrenType, double maxScore) {
throw new IllegalStateException(getClass().getSimpleName() + " has no children");
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import java.util.Objects;
class BucketInfluencerNormalizable extends AbstractLeafNormalizable {
private final BucketInfluencer bucketInfluencer;
public BucketInfluencerNormalizable(BucketInfluencer influencer) {
bucketInfluencer = Objects.requireNonNull(influencer);
}
@Override
public Level getLevel() {
return BucketInfluencer.BUCKET_TIME.equals(bucketInfluencer.getInfluencerFieldName()) ?
Level.ROOT : Level.BUCKET_INFLUENCER;
}
@Override
public String getPartitionFieldName() {
return null;
}
@Override
public String getPartitionFieldValue() {
return null;
}
@Override
public String getPersonFieldName() {
return bucketInfluencer.getInfluencerFieldName();
}
@Override
public String getFunctionName() {
return null;
}
@Override
public String getValueFieldName() {
return null;
}
@Override
public double getProbability() {
return bucketInfluencer.getProbability();
}
@Override
public double getNormalizedScore() {
return bucketInfluencer.getAnomalyScore();
}
@Override
public void setNormalizedScore(double normalizedScore) {
bucketInfluencer.setAnomalyScore(normalizedScore);
}
@Override
public void setParentScore(double parentScore) {
// Do nothing as it is not holding the parent score.
}
@Override
public void resetBigChangeFlag() {
// Do nothing
}
@Override
public void raiseBigChangeFlag() {
// Do nothing
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
class BucketNormalizable implements Normalizable {
private static final int BUCKET_INFLUENCER = 0;
private static final int RECORD = 1;
private static final int PARTITION_SCORE = 2;
private static final List<Integer> CHILDREN_TYPES =
Arrays.asList(BUCKET_INFLUENCER, RECORD, PARTITION_SCORE);
private final Bucket bucket;
public BucketNormalizable(Bucket bucket) {
this.bucket = Objects.requireNonNull(bucket);
}
@Override
public boolean isContainerOnly() {
return true;
}
@Override
public Level getLevel() {
return Level.ROOT;
}
@Override
public String getPartitionFieldName() {
return null;
}
@Override
public String getPartitionFieldValue() {
return null;
}
@Override
public String getPersonFieldName() {
return null;
}
@Override
public String getFunctionName() {
return null;
}
@Override
public String getValueFieldName() {
return null;
}
@Override
public double getProbability() {
throw new IllegalStateException("Bucket is container only");
}
@Override
public double getNormalizedScore() {
return bucket.getAnomalyScore();
}
@Override
public void setNormalizedScore(double normalizedScore) {
bucket.setAnomalyScore(normalizedScore);
}
@Override
public List<Integer> getChildrenTypes() {
return CHILDREN_TYPES;
}
@Override
public List<Normalizable> getChildren() {
List<Normalizable> children = new ArrayList<>();
for (Integer type : getChildrenTypes()) {
children.addAll(getChildren(type));
}
return children;
}
@Override
public List<Normalizable> getChildren(int type) {
List<Normalizable> children = new ArrayList<>();
switch (type) {
case BUCKET_INFLUENCER:
bucket.getBucketInfluencers().stream().forEach(
influencer -> children.add(new BucketInfluencerNormalizable(influencer)));
break;
case RECORD:
bucket.getRecords().stream().forEach(
record -> children.add(new RecordNormalizable(record)));
break;
case PARTITION_SCORE:
bucket.getPartitionScores().stream().forEach(
partitionScore -> children.add(new PartitionScoreNormalizable(partitionScore)));
break;
default:
throw new IllegalArgumentException("Invalid type: " + type);
}
return children;
}
@Override
public boolean setMaxChildrenScore(int childrenType, double maxScore) {
double oldScore = 0.0;
switch (childrenType) {
case BUCKET_INFLUENCER:
oldScore = bucket.getAnomalyScore();
bucket.setAnomalyScore(maxScore);
break;
case RECORD:
oldScore = bucket.getMaxNormalizedProbability();
bucket.setMaxNormalizedProbability(maxScore);
break;
case PARTITION_SCORE:
break;
default:
throw new IllegalArgumentException("Invalid type: " + childrenType);
}
return maxScore != oldScore;
}
@Override
public void setParentScore(double parentScore) {
throw new IllegalStateException("Bucket has no parent");
}
@Override
public void resetBigChangeFlag() {
bucket.resetBigNormalizedUpdateFlag();
}
@Override
public void raiseBigChangeFlag() {
bucket.raiseBigNormalizedUpdateFlag();
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import java.util.Objects;
class InfluencerNormalizable extends AbstractLeafNormalizable {
private final Influencer influencer;
public InfluencerNormalizable(Influencer influencer) {
this.influencer = Objects.requireNonNull(influencer);
}
@Override
public Level getLevel() {
return Level.INFLUENCER;
}
@Override
public String getPartitionFieldName() {
return null;
}
@Override
public String getPartitionFieldValue() {
return null;
}
@Override
public String getPersonFieldName() {
return influencer.getInfluencerFieldName();
}
@Override
public String getFunctionName() {
return null;
}
@Override
public String getValueFieldName() {
return null;
}
@Override
public double getProbability() {
return influencer.getProbability();
}
@Override
public double getNormalizedScore() {
return influencer.getAnomalyScore();
}
@Override
public void setNormalizedScore(double normalizedScore) {
influencer.setAnomalyScore(normalizedScore);
}
@Override
public void setParentScore(double parentScore) {
throw new IllegalStateException("Influencer has no parent");
}
@Override
public void resetBigChangeFlag() {
influencer.resetBigNormalizedUpdateFlag();
}
@Override
public void raiseBigChangeFlag() {
influencer.raiseBigNormalizedUpdateFlag();
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
/**
* An enumeration of the different normalization levels.
* The string value of each level has to match the equivalent
* level names in the normalizer C++ process.
*/
enum Level {
ROOT("root"),
LEAF("leaf"),
BUCKET_INFLUENCER("inflb"),
INFLUENCER("infl"),
PARTITION("part");
private final String m_Key;
Level(String key) {
m_Key = key;
}
public String asString() {
return m_Key;
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.prelert.job.process.normalizer.output.NormalizerResultHandler;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
/**
* Normalizer process that doesn't use native code.
*
* Instead, all scores sent for normalization are multiplied by a supplied factor. Obviously this is useless
* for production operation of the product, but it serves two useful purposes in development:
* - By supplying a factor of 1.0 it can be used as a no-op when native processes are not available
* - It can be used to produce results in testing that do not vary based on changes to the real normalization algorithms
*/
public class MultiplyingNormalizerProcess implements NormalizerProcess {
private static final Logger LOGGER = Loggers.getLogger(MultiplyingNormalizerProcess.class);
private final Settings settings;
private final double factor;
private final PipedInputStream processOutStream;
private XContentBuilder builder;
private boolean shouldIgnoreHeader;
public MultiplyingNormalizerProcess(Settings settings, double factor) {
this.settings = settings;
this.factor = factor;
processOutStream = new PipedInputStream();
try {
XContent xContent = XContentFactory.xContent(XContentType.JSON);
PipedOutputStream processInStream = new PipedOutputStream(processOutStream);
builder = new XContentBuilder(xContent, processInStream);
} catch (IOException e) {
LOGGER.error("Could not set up no-op pipe", e);
}
shouldIgnoreHeader = true;
}
@Override
public void writeRecord(String[] record) throws IOException {
if (shouldIgnoreHeader) {
shouldIgnoreHeader = false;
return;
}
NormalizerResult result = new NormalizerResult();
try {
// This isn't great as the order must match the order in Normalizer.normalize(),
// but it's only for developers who cannot run the native processes
result.setLevel(record[0]);
result.setPartitionFieldName(record[1]);
result.setPartitionFieldValue(record[2]);
result.setPersonFieldName(record[3]);
result.setFunctionName(record[4]);
result.setValueFieldName(record[5]);
result.setProbability(Double.parseDouble(record[6]));
result.setNormalizedScore(factor * Double.parseDouble(record[7]));
} catch (NumberFormatException | ArrayIndexOutOfBoundsException e) {
throw new IOException("Unable to write to no-op normalizer", e);
}
// Write lineified JSON
builder.lfAtEnd();
result.toXContent(builder, null);
}
@Override
public void close() throws IOException {
builder.close();
}
@Override
public NormalizerResultHandler createNormalizedResultsHandler() {
return new NormalizerResultHandler(settings, processOutStream);
}
@Override
public boolean isProcessAlive() {
// Sanity check: make sure the process hasn't terminated already
return true;
}
@Override
public String readError() {
return "";
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
public class NativeNormalizerFactory implements NormalizerFactory {
private final NormalizerProcessFactory processFactory;
private final ExecutorService executorService;
public NativeNormalizerFactory(NormalizerProcessFactory processFactory, ExecutorService executorService) {
this.processFactory = Objects.requireNonNull(processFactory);
this.executorService = Objects.requireNonNull(executorService);
}
@Override
public Normalizer create(String jobId) {
return new Normalizer(jobId, processFactory, executorService);
}
}

View File

@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.xpack.prelert.job.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.prelert.job.process.autodetect.writer.LengthEncodedWriter;
import org.elasticsearch.xpack.prelert.job.process.normalizer.output.NormalizerResultHandler;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Normalizer process using native code.
*/
class NativeNormalizerProcess implements NormalizerProcess {
private static final Logger LOGGER = Loggers.getLogger(NativeNormalizerProcess.class);
private final String jobId;
private final Settings settings;
private final CppLogMessageHandler cppLogHandler;
private final OutputStream processInStream;
private final InputStream processOutStream;
private final LengthEncodedWriter recordWriter;
private Future<?> logTailThread;
NativeNormalizerProcess(String jobId, Settings settings, InputStream logStream, OutputStream processInStream,
InputStream processOutStream, ExecutorService executorService) throws EsRejectedExecutionException {
this.jobId = jobId;
this.settings = settings;
cppLogHandler = new CppLogMessageHandler(jobId, logStream);
this.processInStream = new BufferedOutputStream(processInStream);
this.processOutStream = processOutStream;
this.recordWriter = new LengthEncodedWriter(this.processInStream);
logTailThread = executorService.submit(() -> {
try (CppLogMessageHandler h = cppLogHandler) {
h.tailStream();
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Error tailing C++ process logs", new Object[] { jobId }), e);
}
});
}
@Override
public void writeRecord(String[] record) throws IOException {
recordWriter.writeRecord(record);
}
@Override
public void close() throws IOException {
try {
// closing its input causes the process to exit
processInStream.close();
// wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger
// this may take a long time as it persists the model state
logTailThread.get(5, TimeUnit.MINUTES);
if (cppLogHandler.seenFatalError()) {
throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
}
LOGGER.info("[{}] Normalizer process exited", jobId);
} catch (ExecutionException | TimeoutException e) {
LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running normalizer process", new Object[] { jobId }), e);
} catch (InterruptedException e) {
LOGGER.warn("[{}] Exception closing the running normalizer process", jobId);
Thread.currentThread().interrupt();
}
}
@Override
public NormalizerResultHandler createNormalizedResultsHandler() {
return new NormalizerResultHandler(settings, processOutStream);
}
@Override
public boolean isProcessAlive() {
// Sanity check: make sure the process hasn't terminated already
return !cppLogHandler.hasLogStreamEnded();
}
@Override
public String readError() {
return cppLogHandler.getErrors();
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.prelert.job.process.NativeController;
import org.elasticsearch.xpack.prelert.job.process.ProcessCtrl;
import org.elasticsearch.xpack.prelert.job.process.ProcessPipes;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.NamedPipeHelper;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
public class NativeNormalizerProcessFactory implements NormalizerProcessFactory {
private static final Logger LOGGER = Loggers.getLogger(NativeNormalizerProcessFactory.class);
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(2);
private final Environment env;
private final Settings settings;
private final NativeController nativeController;
public NativeNormalizerProcessFactory(Environment env, Settings settings, NativeController nativeController) {
this.env = Objects.requireNonNull(env);
this.settings = Objects.requireNonNull(settings);
this.nativeController = Objects.requireNonNull(nativeController);
}
@Override
public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan,
boolean perPartitionNormalization, ExecutorService executorService) {
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, ProcessCtrl.NORMALIZE, jobId,
true, false, true, true, false, false);
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan, perPartitionNormalization);
return new NativeNormalizerProcess(jobId, settings, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), executorService);
}
private void createNativeProcess(String jobId, String quantilesState, ProcessPipes processPipes, Integer bucketSpan,
boolean perPartitionNormalization) {
try {
List<String> command = ProcessCtrl.buildNormalizerCommand(env, jobId, quantilesState, bucketSpan,
perPartitionNormalization, nativeController.getPid());
processPipes.addArgs(command);
nativeController.startProcess(command);
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
} catch (IOException | TimeoutException e) {
String msg = "Failed to launch normalizer for job " + jobId;
LOGGER.error(msg);
throw ExceptionsHelper.serverError(msg, e);
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import java.util.List;
interface Normalizable {
/**
* A {@code Normalizable} may be the owner of scores or just a
* container of other {@code Normalizable} objects. A container only
* {@code Normalizable} does not have any scores to be normalized.
* It contains scores that are aggregates of its children.
*
* @return true if this {@code Normalizable} is only a container
*/
boolean isContainerOnly();
Level getLevel();
String getPartitionFieldName();
String getPartitionFieldValue();
String getPersonFieldName();
String getFunctionName();
String getValueFieldName();
double getProbability();
double getNormalizedScore();
void setNormalizedScore(double normalizedScore);
List<Integer> getChildrenTypes();
List<Normalizable> getChildren();
List<Normalizable> getChildren(int type);
/**
* Set the aggregate normalized score for a type of children
*
* @param childrenType the integer that corresponds to a children type
* @param maxScore the aggregate normalized score of the children
* @return true if the score has changed or false otherwise
*/
boolean setMaxChildrenScore(int childrenType, double maxScore);
/**
* If this {@code Normalizable} holds the score of its parent,
* set the parent score
*
* @param parentScore the score of the parent {@code Normalizable}
*/
void setParentScore(double parentScore);
void resetBigChangeFlag();
void raiseBigChangeFlag();
}

View File

@ -0,0 +1,215 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.process.normalizer.output.NormalizerResultHandler;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* Normalizes probabilities to scores in the range 0-100.
* <br>
* Creates and initialises the normalizer process, pipes the probabilities
* through them and adds the normalized values to the records/buckets.
* <br>
* Relies on the C++ normalizer process returning an answer for every input
* and in exactly the same order as the inputs.
*/
public class Normalizer {
private static final Logger LOGGER = Loggers.getLogger(Normalizer.class);
private final String jobId;
private final NormalizerProcessFactory processFactory;
private final ExecutorService executorService;
public Normalizer(String jobId, NormalizerProcessFactory processFactory, ExecutorService executorService) {
this.jobId = jobId;
this.processFactory = processFactory;
this.executorService = executorService;
}
/**
* Launches a normalization process seeded with the quantiles state provided
* and normalizes the given results.
*
* @param bucketSpan If <code>null</code> the default is used
* @param perPartitionNormalization Is normalization per partition (rather than per job)?
* @param results Will be updated with the normalized results
* @param quantilesState The state to be used to seed the system change
* normalizer
*/
public void normalize(Integer bucketSpan, boolean perPartitionNormalization,
List<Normalizable> results, String quantilesState) {
NormalizerProcess process = processFactory.createNormalizerProcess(jobId, quantilesState, bucketSpan,
perPartitionNormalization, executorService);
NormalizerResultHandler resultsHandler = process.createNormalizedResultsHandler();
Future<?> resultsHandlerFuture = executorService.submit(() -> {
try {
resultsHandler.process();
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Error reading normalizer results", new Object[] { jobId }), e);
}
});
try {
process.writeRecord(new String[] {
NormalizerResult.LEVEL_FIELD.getPreferredName(),
NormalizerResult.PARTITION_FIELD_NAME_FIELD.getPreferredName(),
NormalizerResult.PARTITION_FIELD_VALUE_FIELD.getPreferredName(),
NormalizerResult.PERSON_FIELD_NAME_FIELD.getPreferredName(),
NormalizerResult.FUNCTION_NAME_FIELD.getPreferredName(),
NormalizerResult.VALUE_FIELD_NAME_FIELD.getPreferredName(),
NormalizerResult.PROBABILITY_FIELD.getPreferredName(),
NormalizerResult.NORMALIZED_SCORE_FIELD.getPreferredName()
});
for (Normalizable result : results) {
writeNormalizableAndChildrenRecursively(result, process);
}
} catch (IOException e) {
LOGGER.error("[" + jobId + "] Error writing to the normalizer", e);
} finally {
try {
process.close();
} catch (IOException e) {
LOGGER.error("[" + jobId + "] Error closing normalizer", e);
}
}
// Wait for the results handler to finish
try {
resultsHandlerFuture.get();
mergeNormalizedScoresIntoResults(resultsHandler.getNormalizedResults(), results);
} catch (ExecutionException e) {
LOGGER.error(new ParameterizedMessage("[{}] Error processing normalizer results", new Object[] { jobId }), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void writeNormalizableAndChildrenRecursively(Normalizable normalizable,
NormalizerProcess process) throws IOException {
if (normalizable.isContainerOnly() == false) {
process.writeRecord(new String[] {
normalizable.getLevel().asString(),
Strings.coalesceToEmpty(normalizable.getPartitionFieldName()),
Strings.coalesceToEmpty(normalizable.getPartitionFieldValue()),
Strings.coalesceToEmpty(normalizable.getPersonFieldName()),
Strings.coalesceToEmpty(normalizable.getFunctionName()),
Strings.coalesceToEmpty(normalizable.getValueFieldName()),
Double.toString(normalizable.getProbability()),
Double.toString(normalizable.getNormalizedScore())
});
}
for (Normalizable child : normalizable.getChildren()) {
writeNormalizableAndChildrenRecursively(child, process);
}
}
/**
* Updates the normalized scores on the results.
*/
private void mergeNormalizedScoresIntoResults(List<NormalizerResult> normalizedScores,
List<Normalizable> results) {
Iterator<NormalizerResult> scoresIter = normalizedScores.iterator();
for (Normalizable result : results) {
mergeRecursively(scoresIter, null, false, result);
}
if (scoresIter.hasNext()) {
LOGGER.error("[{}] Unused normalized scores remain after updating all results: {} for {}",
jobId, normalizedScores.size(), results.size());
}
}
/**
* Recursively merges the scores returned by the normalization process into the results
*
* @param scoresIter an Iterator of the scores returned by the normalization process
* @param parent the parent result
* @param parentHadBigChange whether the parent had a big change
* @param result the result to be updated
* @return the effective normalized score of the given result
*/
private double mergeRecursively(Iterator<NormalizerResult> scoresIter, Normalizable parent,
boolean parentHadBigChange, Normalizable result) {
boolean hasBigChange = false;
if (result.isContainerOnly() == false) {
if (!scoresIter.hasNext()) {
String msg = "Error iterating normalized results";
LOGGER.error("[{}] {}", jobId, msg);
throw new ElasticsearchException(msg);
}
result.resetBigChangeFlag();
if (parent != null && parentHadBigChange) {
result.setParentScore(parent.getNormalizedScore());
result.raiseBigChangeFlag();
}
double normalizedScore = scoresIter.next().getNormalizedScore();
hasBigChange = isBigUpdate(result.getNormalizedScore(), normalizedScore);
if (hasBigChange) {
result.setNormalizedScore(normalizedScore);
result.raiseBigChangeFlag();
if (parent != null) {
parent.raiseBigChangeFlag();
}
}
}
for (Integer childrenType : result.getChildrenTypes()) {
List<Normalizable> children = result.getChildren(childrenType);
if (!children.isEmpty()) {
double maxChildrenScore = 0.0;
for (Normalizable child : children) {
maxChildrenScore = Math.max(
mergeRecursively(scoresIter, result, hasBigChange, child),
maxChildrenScore);
}
hasBigChange |= result.setMaxChildrenScore(childrenType, maxChildrenScore);
}
}
return result.getNormalizedScore();
}
/**
* Encapsulate the logic for deciding whether a change to a normalized score
* is "big".
* <p>
* Current logic is that a big change is a change of at least 1 or more than
* than 50% of the higher of the two values.
*
* @param oldVal The old value of the normalized score
* @param newVal The new value of the normalized score
* @return true if the update is considered "big"
*/
private static boolean isBigUpdate(double oldVal, double newVal) {
if (Math.abs(oldVal - newVal) >= 1.0) {
return true;
}
if (oldVal > newVal) {
if (oldVal * 0.5 > newVal) {
return true;
}
} else {
if (newVal * 0.5 > oldVal) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
/**
* Factory interface for creating implementations of {@link Normalizer}
*/
public interface NormalizerFactory {
/**
* Create an implementation of {@link Normalizer}
*
* @param jobId The job ID
* @return The normalizer
*/
Normalizer create(String jobId);
}

View File

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.xpack.prelert.job.process.normalizer.output.NormalizerResultHandler;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
/**
* Interface representing the native C++ normalizer process
*/
public interface NormalizerProcess extends Closeable {
/**
* Write the record to normalizer. The record parameter should not be encoded
* (i.e. length encoded) the implementation will appy the corrrect encoding.
*
* @param record Plain array of strings, implementors of this class should
* encode the record appropriately
* @throws IOException If the write failed
*/
void writeRecord(String[] record) throws IOException;
/**
* Create a result handler for this process's results.
* @return results handler
*/
NormalizerResultHandler createNormalizedResultsHandler();
/**
* Returns true if the process still running.
* @return True if the process is still running
*/
boolean isProcessAlive();
/**
* Read any content in the error output buffer.
* @return An error message or empty String if no error.
*/
String readError();
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import java.util.concurrent.ExecutorService;
/**
* Factory interface for creating implementations of {@link NormalizerProcess}
*/
public interface NormalizerProcessFactory {
/**
* Create an implementation of {@link NormalizerProcess}
*
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @return The process
*/
NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan, boolean perPartitionNormalization,
ExecutorService executorService);
}

View File

@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
/**
* Parse the output of the normaliser process, for example:
*
* {"probability":0.01,"normalized_score":2.2}
*/
public class NormalizerResult extends ToXContentToBytes implements Writeable {
static final ParseField LEVEL_FIELD = new ParseField("level");
static final ParseField PARTITION_FIELD_NAME_FIELD = new ParseField("partition_field_name");
static final ParseField PARTITION_FIELD_VALUE_FIELD = new ParseField("partition_field_value");
static final ParseField PERSON_FIELD_NAME_FIELD = new ParseField("person_field_name");
static final ParseField FUNCTION_NAME_FIELD = new ParseField("function_name");
static final ParseField VALUE_FIELD_NAME_FIELD = new ParseField("value_field_name");
static final ParseField PROBABILITY_FIELD = new ParseField("probability");
static final ParseField NORMALIZED_SCORE_FIELD = new ParseField("normalized_score");
public static final ObjectParser<NormalizerResult, ParseFieldMatcherSupplier> PARSER = new ObjectParser<>(
LEVEL_FIELD.getPreferredName(), NormalizerResult::new);
static {
PARSER.declareString(NormalizerResult::setLevel, LEVEL_FIELD);
PARSER.declareString(NormalizerResult::setPartitionFieldName, PARTITION_FIELD_NAME_FIELD);
PARSER.declareString(NormalizerResult::setPartitionFieldValue, PARTITION_FIELD_VALUE_FIELD);
PARSER.declareString(NormalizerResult::setPersonFieldName, PERSON_FIELD_NAME_FIELD);
PARSER.declareString(NormalizerResult::setFunctionName, FUNCTION_NAME_FIELD);
PARSER.declareString(NormalizerResult::setValueFieldName, VALUE_FIELD_NAME_FIELD);
PARSER.declareDouble(NormalizerResult::setProbability, PROBABILITY_FIELD);
PARSER.declareDouble(NormalizerResult::setNormalizedScore, NORMALIZED_SCORE_FIELD);
}
private String level;
private String partitionFieldName;
private String partitionFieldValue;
private String personFieldName;
private String functionName;
private String valueFieldName;
private double probability;
private double normalizedScore;
public NormalizerResult() {
}
public NormalizerResult(StreamInput in) throws IOException {
level = in.readOptionalString();
partitionFieldName = in.readOptionalString();
partitionFieldValue = in.readOptionalString();
personFieldName = in.readOptionalString();
functionName = in.readOptionalString();
valueFieldName = in.readOptionalString();
probability = in.readDouble();
normalizedScore = in.readDouble();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(level);
out.writeOptionalString(partitionFieldName);
out.writeOptionalString(partitionFieldValue);
out.writeOptionalString(personFieldName);
out.writeOptionalString(functionName);
out.writeOptionalString(valueFieldName);
out.writeDouble(probability);
out.writeDouble(normalizedScore);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(LEVEL_FIELD.getPreferredName(), level);
builder.field(PARTITION_FIELD_NAME_FIELD.getPreferredName(), partitionFieldName);
builder.field(PARTITION_FIELD_VALUE_FIELD.getPreferredName(), partitionFieldValue);
builder.field(PERSON_FIELD_NAME_FIELD.getPreferredName(), personFieldName);
builder.field(FUNCTION_NAME_FIELD.getPreferredName(), functionName);
builder.field(VALUE_FIELD_NAME_FIELD.getPreferredName(), valueFieldName);
builder.field(PROBABILITY_FIELD.getPreferredName(), probability);
builder.field(NORMALIZED_SCORE_FIELD.getPreferredName(), normalizedScore);
builder.endObject();
return builder;
}
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
public String getPartitionFieldName() {
return partitionFieldName;
}
public void setPartitionFieldName(String partitionFieldName) {
this.partitionFieldName = partitionFieldName;
}
public String getPartitionFieldValue() {
return partitionFieldValue;
}
public void setPartitionFieldValue(String partitionFieldValue) {
this.partitionFieldValue = partitionFieldValue;
}
public String getPersonFieldName() {
return personFieldName;
}
public void setPersonFieldName(String personFieldName) {
this.personFieldName = personFieldName;
}
public String getFunctionName() {
return functionName;
}
public void setFunctionName(String functionName) {
this.functionName = functionName;
}
public String getValueFieldName() {
return valueFieldName;
}
public void setValueFieldName(String valueFieldName) {
this.valueFieldName = valueFieldName;
}
public double getProbability() {
return probability;
}
public void setProbability(double probability) {
this.probability = probability;
}
public double getNormalizedScore() {
return normalizedScore;
}
public void setNormalizedScore(double normalizedScore) {
this.normalizedScore = normalizedScore;
}
@Override
public int hashCode() {
return Objects.hash(level, partitionFieldName, partitionFieldValue, personFieldName,
functionName, valueFieldName, probability, normalizedScore);
}
/**
* Compare all the fields.
*/
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof NormalizerResult)) {
return false;
}
NormalizerResult that = (NormalizerResult)other;
return Objects.equals(this.level, that.level)
&& Objects.equals(this.partitionFieldName, that.partitionFieldName)
&& Objects.equals(this.partitionFieldValue, that.partitionFieldValue)
&& Objects.equals(this.personFieldName, that.personFieldName)
&& Objects.equals(this.functionName, that.functionName)
&& Objects.equals(this.valueFieldName, that.valueFieldName)
&& this.probability == that.probability
&& this.normalizedScore == that.normalizedScore;
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.xpack.prelert.job.results.PartitionScore;
import java.util.Objects;
public class PartitionScoreNormalizable extends AbstractLeafNormalizable {
private final PartitionScore score;
public PartitionScoreNormalizable(PartitionScore score) {
this.score = Objects.requireNonNull(score);
}
@Override
public Level getLevel() {
return Level.PARTITION;
}
@Override
public String getPartitionFieldName() {
return score.getPartitionFieldName();
}
@Override
public String getPartitionFieldValue() {
return score.getPartitionFieldValue();
}
@Override
public String getPersonFieldName() {
return null;
}
@Override
public String getFunctionName() {
return null;
}
@Override
public String getValueFieldName() {
return null;
}
@Override
public double getProbability() {
return score.getProbability();
}
@Override
public double getNormalizedScore() {
return score.getAnomalyScore();
}
@Override
public void setNormalizedScore(double normalizedScore) {
score.setAnomalyScore(normalizedScore);
}
@Override
public void setParentScore(double parentScore) {
// Do nothing as it is not holding the parent score.
}
@Override
public void resetBigChangeFlag() {
score.resetBigNormalizedUpdateFlag();
}
@Override
public void raiseBigChangeFlag() {
score.raiseBigNormalizedUpdateFlag();
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import java.util.Objects;
class RecordNormalizable extends AbstractLeafNormalizable {
private final AnomalyRecord record;
public RecordNormalizable(AnomalyRecord record) {
this.record = Objects.requireNonNull(record);
}
@Override
public Level getLevel() {
return Level.LEAF;
}
@Override
public String getPartitionFieldName() {
return record.getPartitionFieldName();
}
@Override
public String getPartitionFieldValue() {
return record.getPartitionFieldValue();
}
@Override
public String getPersonFieldName() {
String over = record.getOverFieldName();
return over != null ? over : record.getByFieldName();
}
@Override
public String getFunctionName() {
return record.getFunction();
}
@Override
public String getValueFieldName() {
return record.getFieldName();
}
@Override
public double getProbability() {
return record.getProbability();
}
@Override
public double getNormalizedScore() {
return record.getNormalizedProbability();
}
@Override
public void setNormalizedScore(double normalizedScore) {
record.setNormalizedProbability(normalizedScore);
}
@Override
public void setParentScore(double parentScore) {
record.setAnomalyScore(parentScore);
}
@Override
public void resetBigChangeFlag() {
record.resetBigNormalizedUpdateFlag();
}
@Override
public void raiseBigChangeFlag() {
record.raiseBigNormalizedUpdateFlag();
}
}

View File

@ -7,28 +7,28 @@ package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
public interface Renormaliser {
public interface Renormalizer {
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records
*/
void renormalise(Quantiles quantiles);
void renormalize(Quantiles quantiles);
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records and aggregate records to the partition
* level
*/
void renormaliseWithPartition(Quantiles quantiles);
void renormalizeWithPartition(Quantiles quantiles);
/**
* Blocks until the renormaliser is idle and no further normalisation tasks are pending.
* Blocks until the renormalizer is idle and no further normalization tasks are pending.
*/
void waitUntilIdle();
/**
* Shut down the renormaliser
* Shut down the renormalizer
*/
boolean shutdown();
}

View File

@ -6,6 +6,6 @@
package org.elasticsearch.xpack.prelert.job.process.normalizer;
public interface RenormaliserFactory {
Renormaliser create(String jobId);
public interface RenormalizerFactory {
Renormalizer create(String jobId);
}

View File

@ -0,0 +1,242 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizer;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Thread safe class that updates the scores of all existing results
* with the renormalized scores
*/
class ScoresUpdater {
private static final Logger LOGGER = Loggers.getLogger(ScoresUpdater.class);
/**
* Target number of records to renormalize at a time
*/
private static final int TARGET_RECORDS_TO_RENORMALIZE = 100000;
// 30 days
private static final long DEFAULT_RENORMALIZATION_WINDOW_MS = 2592000000L;
private static final int DEFAULT_BUCKETS_IN_RENORMALIZATION_WINDOW = 100;
private static final long SECONDS_IN_DAY = 86400;
private static final long MILLISECONDS_IN_SECOND = 1000;
private final Job job;
private final JobProvider jobProvider;
private final JobRenormalizer updatesPersister;
private final NormalizerFactory normalizerFactory;
private int bucketSpan;
private long normalizationWindow;
private boolean perPartitionNormalization;
public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizer jobRenormalizer, NormalizerFactory normalizerFactory) {
this.job = job;
this.jobProvider = Objects.requireNonNull(jobProvider);
updatesPersister = Objects.requireNonNull(jobRenormalizer);
this.normalizerFactory = Objects.requireNonNull(normalizerFactory);
bucketSpan = getBucketSpanOrDefault(job.getAnalysisConfig());
normalizationWindow = getNormalizationWindowOrDefault(job);
perPartitionNormalization = getPerPartitionNormalizationOrDefault(job.getAnalysisConfig());
}
private static int getBucketSpanOrDefault(AnalysisConfig analysisConfig) {
if (analysisConfig != null && analysisConfig.getBucketSpan() != null) {
return analysisConfig.getBucketSpan().intValue();
}
// A bucketSpan value of 0 will result to the default
// bucketSpan value being used in the back-end.
return 0;
}
private long getNormalizationWindowOrDefault(Job job) {
if (job.getRenormalizationWindowDays() != null) {
return job.getRenormalizationWindowDays() * SECONDS_IN_DAY * MILLISECONDS_IN_SECOND;
}
return Math.max(DEFAULT_RENORMALIZATION_WINDOW_MS,
DEFAULT_BUCKETS_IN_RENORMALIZATION_WINDOW * bucketSpan * MILLISECONDS_IN_SECOND);
}
private static boolean getPerPartitionNormalizationOrDefault(AnalysisConfig analysisConfig) {
return (analysisConfig != null) && analysisConfig.getUsePerPartitionNormalization();
}
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records
*/
public void update(String quantilesState, long endBucketEpochMs, long windowExtensionMs, boolean perPartitionNormalization) {
Normalizer normalizer = normalizerFactory.create(job.getId());
int[] counts = {0, 0};
updateBuckets(normalizer, quantilesState, endBucketEpochMs,
windowExtensionMs, counts, perPartitionNormalization);
updateInfluencers(normalizer, quantilesState, endBucketEpochMs,
windowExtensionMs, counts);
LOGGER.info("[{}] Normalization resulted in: {} updates, {} no-ops", job.getId(), counts[0], counts[1]);
}
private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
BatchedDocumentsIterator<Bucket> bucketsIterator =
jobProvider.newBatchedBucketsIterator(job.getId())
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs);
// Make a list of buckets with their records to be renormalized.
// This may be shorter than the original list of buckets for two
// reasons:
// 1) We don't bother with buckets that have raw score 0 and no
// records
// 2) We limit the total number of records to be not much more
// than 100000
List<Bucket> bucketsToRenormalize = new ArrayList<>();
int batchRecordCount = 0;
int skipped = 0;
while (bucketsIterator.hasNext()) {
// Get a batch of buckets without their records to calculate
// how many buckets can be sensibly retrieved
Deque<Bucket> buckets = bucketsIterator.next();
if (buckets.isEmpty()) {
LOGGER.debug("[{}] No buckets to renormalize for job", job.getId());
break;
}
while (!buckets.isEmpty()) {
Bucket currentBucket = buckets.removeFirst();
if (currentBucket.isNormalizable()) {
bucketsToRenormalize.add(currentBucket);
batchRecordCount += jobProvider.expandBucket(job.getId(), false, currentBucket);
} else {
++skipped;
}
if (batchRecordCount >= TARGET_RECORDS_TO_RENORMALIZE) {
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState,
batchRecordCount, skipped, counts, perPartitionNormalization);
bucketsToRenormalize = new ArrayList<>();
batchRecordCount = 0;
skipped = 0;
}
}
}
if (!bucketsToRenormalize.isEmpty()) {
normalizeBuckets(normalizer, bucketsToRenormalize, quantilesState,
batchRecordCount, skipped, counts, perPartitionNormalization);
}
}
private long calcNormalizationWindowStart(long endEpochMs, long windowExtensionMs) {
return Math.max(0, endEpochMs - normalizationWindow - windowExtensionMs);
}
private void normalizeBuckets(Normalizer normalizer, List<Bucket> buckets, String quantilesState,
int recordCount, int skipped, int[] counts, boolean perPartitionNormalization) {
LOGGER.debug("[{}] Will renormalize a batch of {} buckets with {} records ({} empty buckets skipped)",
job.getId(), buckets.size(), recordCount, skipped);
List<Normalizable> asNormalizables = buckets.stream()
.map(bucket -> new BucketNormalizable(bucket)).collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
for (Bucket bucket : buckets) {
updateSingleBucket(bucket, counts, perPartitionNormalization);
}
}
/**
* Update the anomaly score and unsual score fields on the bucket provided
* and all contained records
*
* @param counts Element 0 will be incremented if we update a document and
* element 1 if we don't
*/
private void updateSingleBucket(Bucket bucket, int[] counts, boolean perPartitionNormalization) {
updateBucketIfItHasBigChange(bucket, counts, perPartitionNormalization);
updateRecordsThatHaveBigChange(bucket, counts);
}
private void updateBucketIfItHasBigChange(Bucket bucket, int[] counts, boolean perPartitionNormalization) {
if (bucket.hadBigNormalizedUpdate()) {
if (perPartitionNormalization) {
updatesPersister.updatePerPartitionMaxProbabilities(bucket.getJobId(), bucket.getRecords());
}
updatesPersister.updateBucket(bucket);
++counts[0];
} else {
++counts[1];
}
}
private void updateRecordsThatHaveBigChange(Bucket bucket, int[] counts) {
List<AnomalyRecord> toUpdate = new ArrayList<>();
for (AnomalyRecord record : bucket.getRecords()) {
if (record.hadBigNormalizedUpdate()) {
toUpdate.add(record);
++counts[0];
} else {
++counts[1];
}
}
if (!toUpdate.isEmpty()) {
updatesPersister.updateRecords(bucket.getId(), toUpdate);
}
}
private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
long windowExtensionMs, int[] counts) {
BatchedDocumentsIterator<Influencer> influencersIterator = jobProvider
.newBatchedInfluencersIterator(job.getId())
.timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs);
while (influencersIterator.hasNext()) {
Deque<Influencer> influencers = influencersIterator.next();
if (influencers.isEmpty()) {
LOGGER.debug("[{}] No influencers to renormalize for job", job.getId());
break;
}
LOGGER.debug("[{}] Will renormalize a batch of {} influencers", job.getId(), influencers.size());
List<Normalizable> asNormalizables = influencers.stream()
.map(bucket -> new InfluencerNormalizable(bucket)).collect(Collectors.toList());
normalizer.normalize(bucketSpan, perPartitionNormalization, asNormalizables, quantilesState);
List<Influencer> toUpdate = new ArrayList<>();
for (Influencer influencer : influencers) {
if (influencer.hadBigNormalizedUpdate()) {
toUpdate.add(influencer);
++counts[0];
} else {
++counts[1];
}
}
if (!toUpdate.isEmpty()) {
updatesPersister.updateInfluencer(job.getId(), toUpdate);
}
}
}
}

View File

@ -5,22 +5,22 @@
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer.noop;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
/**
* A {@link Renormaliser} implementation that does absolutely nothing
* This should be removed when the normaliser code is ported
* A {@link Renormalizer} implementation that does absolutely nothing
* This should be removed when the normalizer code is ported
*/
public class NoOpRenormaliser implements Renormaliser {
// NORELEASE Remove once the normaliser code is ported
public class NoOpRenormalizer implements Renormalizer {
// NORELEASE Remove once the normalizer code is ported
@Override
public void renormalise(Quantiles quantiles) {
public void renormalize(Quantiles quantiles) {
}
@Override
public void renormaliseWithPartition(Quantiles quantiles) {
public void renormalizeWithPartition(Quantiles quantiles) {
}

View File

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer.output;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerResult;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Reads normalizer output.
*/
public class NormalizerResultHandler extends AbstractComponent {
private static final int READ_BUF_SIZE = 1024;
private final InputStream inputStream;
private final List<NormalizerResult> normalizedResults;
public NormalizerResultHandler(Settings settings, InputStream inputStream) {
super(settings);
this.inputStream = inputStream;
normalizedResults = new ArrayList<>();
}
public List<NormalizerResult> getNormalizedResults() {
return normalizedResults;
}
public void process() throws IOException {
XContent xContent = XContentFactory.xContent(XContentType.JSON);
BytesReference bytesRef = null;
byte[] readBuf = new byte[READ_BUF_SIZE];
for (int bytesRead = inputStream.read(readBuf); bytesRead != -1; bytesRead = inputStream.read(readBuf)) {
if (bytesRef == null) {
bytesRef = new BytesArray(readBuf, 0, bytesRead);
} else {
bytesRef = new CompositeBytesReference(bytesRef, new BytesArray(readBuf, 0, bytesRead));
}
bytesRef = parseResults(xContent, bytesRef);
readBuf = new byte[READ_BUF_SIZE];
}
}
private BytesReference parseResults(XContent xContent, BytesReference bytesRef) throws IOException {
byte marker = xContent.streamSeparator();
int from = 0;
while (true) {
int nextMarker = findNextMarker(marker, bytesRef, from);
if (nextMarker == -1) {
// No more markers in this block
break;
}
// Ignore blank lines
if (nextMarker > from) {
parseResult(xContent, bytesRef.slice(from, nextMarker - from));
}
from = nextMarker + 1;
}
if (from >= bytesRef.length()) {
return null;
}
return bytesRef.slice(from, bytesRef.length() - from);
}
private void parseResult(XContent xContent, BytesReference bytesRef) throws IOException {
XContentParser parser = xContent.createParser(bytesRef);
NormalizerResult result = NormalizerResult.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT);
normalizedResults.add(result);
}
private static int findNextMarker(byte marker, BytesReference bytesRef, int from) {
for (int i = from; i < bytesRef.length(); ++i) {
if (bytesRef.get(i) == marker) {
return i;
}
}
return -1;
}
}

View File

@ -72,7 +72,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
public static final ParseField CAUSES = new ParseField("causes");
/**
* Normalisation
* Normalization
*/
public static final ParseField ANOMALY_SCORE = new ParseField("anomaly_score");
public static final ParseField NORMALIZED_PROBABILITY = new ParseField("normalized_probability");
@ -147,7 +147,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
private List<Influence> influencers;
private boolean hadBigNormalisedUpdate;
private boolean hadBigNormalizedUpdate;
public AnomalyRecord(String jobId, Date timestamp, long bucketSpan, int sequenceNum) {
this.jobId = jobId;
@ -190,7 +190,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
if (in.readBoolean()) {
influencers = in.readList(Influence::new);
}
hadBigNormalisedUpdate = in.readBoolean();
}
@Override
@ -235,7 +234,6 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
if (hasInfluencers) {
out.writeList(influencers);
}
out.writeBoolean(hadBigNormalisedUpdate);
}
@Override
@ -491,7 +489,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
// hadBigNormalisedUpdate is deliberately excluded from the hash
// hadBigNormalizedUpdate is deliberately excluded from the hash
return Objects.hash(jobId, detectorIndex, sequenceNum, bucketSpan, probability, anomalyScore,
normalizedProbability, initialNormalizedProbability, typical, actual,
@ -513,7 +511,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
AnomalyRecord that = (AnomalyRecord) other;
// hadBigNormalisedUpdate is deliberately excluded from the test
// hadBigNormalizedUpdate is deliberately excluded from the test
return Objects.equals(this.jobId, that.jobId)
&& this.detectorIndex == that.detectorIndex
&& this.sequenceNum == that.sequenceNum
@ -540,15 +538,15 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.influencers, that.influencers);
}
public boolean hadBigNormalisedUpdate() {
return this.hadBigNormalisedUpdate;
public boolean hadBigNormalizedUpdate() {
return this.hadBigNormalizedUpdate;
}
public void resetBigNormalisedUpdateFlag() {
hadBigNormalisedUpdate = false;
public void resetBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigNormalisedUpdateFlag() {
hadBigNormalisedUpdate = true;
public void raiseBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = true;
}
}

View File

@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collector;
import java.util.stream.Collectors;
/**
* Bucket Result POJO
@ -97,7 +99,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
private List<AnomalyRecord> records = Collections.emptyList();
private long eventCount;
private boolean isInterim;
private boolean hadBigNormalisedUpdate;
private boolean hadBigNormalizedUpdate;
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>();
private long processingTimeMs;
private Map<String, Double> perPartitionMaxProbability = Collections.emptyMap();
@ -121,7 +123,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
records = in.readList(AnomalyRecord::new);
eventCount = in.readLong();
isInterim = in.readBoolean();
hadBigNormalisedUpdate = in.readBoolean();
bucketInfluencers = in.readList(BucketInfluencer::new);
processingTimeMs = in.readLong();
perPartitionMaxProbability = (Map<String, Double>) in.readGenericValue();
@ -140,7 +141,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
out.writeList(records);
out.writeLong(eventCount);
out.writeBoolean(isInterim);
out.writeBoolean(hadBigNormalisedUpdate);
out.writeList(bucketInfluencers);
out.writeLong(processingTimeMs);
out.writeGenericValue(perPartitionMaxProbability);
@ -315,7 +315,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
// hadBigNormalisedUpdate is deliberately excluded from the hash
// hadBigNormalizedUpdate is deliberately excluded from the hash
// as is id, which is generated by the datastore
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records,
isInterim, bucketSpan, bucketInfluencers);
@ -336,7 +336,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
Bucket that = (Bucket) other;
// hadBigNormalisedUpdate is deliberately excluded from the test
// hadBigNormalizedUpdate is deliberately excluded from the test
// as is id, which is generated by the datastore
return Objects.equals(this.jobId, that.jobId) && Objects.equals(this.timestamp, that.timestamp)
&& (this.eventCount == that.eventCount) && (this.bucketSpan == that.bucketSpan)
@ -346,35 +346,35 @@ public class Bucket extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers);
}
public boolean hadBigNormalisedUpdate() {
return hadBigNormalisedUpdate;
public boolean hadBigNormalizedUpdate() {
return hadBigNormalizedUpdate;
}
public void resetBigNormalisedUpdateFlag() {
hadBigNormalisedUpdate = false;
public void resetBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigNormalisedUpdateFlag() {
hadBigNormalisedUpdate = true;
public void raiseBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = true;
}
/**
* This method encapsulated the logic for whether a bucket should be
* normalised. The decision depends on two factors.
* normalized. The decision depends on two factors.
*
* The first is whether the bucket has bucket influencers. Since bucket
* influencers were introduced, every bucket must have at least one bucket
* influencer. If it does not, it means it is a bucket persisted with an
* older version and should not be normalised.
* older version and should not be normalized.
*
* The second factor has to do with minimising the number of buckets that
* are sent for normalisation. Buckets that have no records and a score of
* zero should not be normalised as their score will not change and they
* are sent for normalization. Buckets that have no records and a score of
* zero should not be normalized as their score will not change and they
* will just add overhead.
*
* @return true if the bucket should be normalised or false otherwise
* @return true if the bucket should be normalized or false otherwise
*/
public boolean isNormalisable() {
public boolean isNormalizable() {
if (bucketInfluencers == null || bucketInfluencers.isEmpty()) {
return false;
}

View File

@ -30,7 +30,7 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public static final String RESULT_TYPE_VALUE = "bucketInfluencer";
public static final ParseField RESULT_TYPE_FIELD = new ParseField(RESULT_TYPE_VALUE);
/*
/**
* Field names
*/
public static final ParseField INFLUENCER_FIELD_NAME = new ParseField("influencer_field_name");
@ -43,6 +43,11 @@ public class BucketInfluencer extends ToXContentToBytes implements Writeable {
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField SEQUENCE_NUM = new ParseField("sequence_num");
/**
* The influencer field name used for time influencers
*/
public static final String BUCKET_TIME = "bucket_time";
public static final ConstructingObjectParser<BucketInfluencer, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(RESULT_TYPE_FIELD.getPreferredName(), a -> new BucketInfluencer((String) a[0],
(Date) a[1], (long) a[2], (int) a[3]));

View File

@ -79,7 +79,7 @@ public class Influencer extends ToXContentToBytes implements Writeable {
private double probability;
private double initialAnomalyScore;
private double anomalyScore;
private boolean hadBigNormalisedUpdate;
private boolean hadBigNormalizedUpdate;
private boolean isInterim;
public Influencer(String jobId, String fieldName, String fieldValue, Date timestamp, long bucketSpan, int sequenceNum) {
@ -99,7 +99,6 @@ public class Influencer extends ToXContentToBytes implements Writeable {
probability = in.readDouble();
initialAnomalyScore = in.readDouble();
anomalyScore = in.readDouble();
hadBigNormalisedUpdate = in.readBoolean();
isInterim = in.readBoolean();
bucketSpan = in.readLong();
sequenceNum = in.readInt();
@ -114,7 +113,6 @@ public class Influencer extends ToXContentToBytes implements Writeable {
out.writeDouble(probability);
out.writeDouble(initialAnomalyScore);
out.writeDouble(anomalyScore);
out.writeBoolean(hadBigNormalisedUpdate);
out.writeBoolean(isInterim);
out.writeLong(bucketSpan);
out.writeInt(sequenceNum);
@ -190,22 +188,22 @@ public class Influencer extends ToXContentToBytes implements Writeable {
isInterim = value;
}
public boolean hadBigNormalisedUpdate() {
return this.hadBigNormalisedUpdate;
public boolean hadBigNormalizedUpdate() {
return this.hadBigNormalizedUpdate;
}
public void resetBigNormalisedUpdateFlag() {
hadBigNormalisedUpdate = false;
public void resetBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigNormalisedUpdateFlag() {
hadBigNormalisedUpdate = true;
public void raiseBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = true;
}
@Override
public int hashCode() {
// hadBigNormalisedUpdate is deliberately excluded from the hash
// hadBigNormalizedUpdate is deliberately excluded from the hash
return Objects.hash(jobId, timestamp, influenceField, influenceValue, initialAnomalyScore, anomalyScore, probability, isInterim,
bucketSpan, sequenceNum);
@ -227,7 +225,7 @@ public class Influencer extends ToXContentToBytes implements Writeable {
Influencer other = (Influencer) obj;
// hadBigNormalisedUpdate is deliberately excluded from the test
// hadBigNormalizedUpdate is deliberately excluded from the test
return Objects.equals(jobId, other.jobId) && Objects.equals(timestamp, other.timestamp)
&& Objects.equals(influenceField, other.influenceField)
&& Objects.equals(influenceValue, other.influenceValue)

View File

@ -23,7 +23,7 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
private String partitionFieldName;
private double anomalyScore;
private double probability;
private boolean hadBigNormalisedUpdate;
private boolean hadBigNormalizedUpdate;
public static final ConstructingObjectParser<PartitionScore, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
PARTITION_SCORE.getPreferredName(), a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3]));
@ -36,7 +36,7 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
}
public PartitionScore(String fieldName, String fieldValue, double anomalyScore, double probability) {
hadBigNormalisedUpdate = false;
hadBigNormalizedUpdate = false;
partitionFieldName = fieldName;
partitionFieldValue = fieldValue;
this.anomalyScore = anomalyScore;
@ -48,7 +48,6 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
partitionFieldValue = in.readString();
anomalyScore = in.readDouble();
probability = in.readDouble();
hadBigNormalisedUpdate = in.readBoolean();
}
@Override
@ -57,7 +56,6 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
out.writeString(partitionFieldValue);
out.writeDouble(anomalyScore);
out.writeDouble(probability);
out.writeBoolean(hadBigNormalisedUpdate);
}
@Override
@ -120,22 +118,22 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
PartitionScore that = (PartitionScore) other;
// hadBigNormalisedUpdate is deliberately excluded from the test
// hadBigNormalizedUpdate is deliberately excluded from the test
// as is id, which is generated by the datastore
return Objects.equals(this.partitionFieldValue, that.partitionFieldValue)
&& Objects.equals(this.partitionFieldName, that.partitionFieldName) && (this.probability == that.probability)
&& (this.anomalyScore == that.anomalyScore);
}
public boolean hadBigNormalisedUpdate() {
return hadBigNormalisedUpdate;
public boolean hadBigNormalizedUpdate() {
return hadBigNormalizedUpdate;
}
public void resetBigNormalisedUpdateFlag() {
hadBigNormalisedUpdate = false;
public void resetBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = false;
}
public void raiseBigNormalisedUpdateFlag() {
hadBigNormalisedUpdate = true;
public void raiseBigNormalizedUpdateFlag() {
hadBigNormalizedUpdate = true;
}
}

View File

@ -29,7 +29,7 @@ import java.util.stream.Collector;
import java.util.stream.Collectors;
/**
* When per-partition normalisation is enabled this class represents
* When per-partition normalization is enabled this class represents
* the max anomalous probabilities of each partition per bucket. These values
* calculated from the bucket's anomaly records.
*/
@ -124,7 +124,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
Optional<PartitionProbability> first =
perPartitionMaxProbabilities.stream().filter(pp -> partitionValue.equals(pp.getPartitionValue())).findFirst();
return first.isPresent() ? first.get().getMaxNormalisedProbability() : 0.0;
return first.isPresent() ? first.get().getMaxNormalizedProbability() : 0.0;
}
/**
@ -201,7 +201,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
}
/**
* Class for partitionValue, maxNormalisedProb pairs
* Class for partitionValue, maxNormalizedProb pairs
*/
public static class PartitionProbability extends ToXContentToBytes implements Writeable {
@ -215,44 +215,44 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
}
private final String partitionValue;
private final double maxNormalisedProbability;
private final double maxNormalizedProbability;
PartitionProbability(String partitionName, double maxNormalisedProbability) {
PartitionProbability(String partitionName, double maxNormalizedProbability) {
this.partitionValue = partitionName;
this.maxNormalisedProbability = maxNormalisedProbability;
this.maxNormalizedProbability = maxNormalizedProbability;
}
public PartitionProbability(StreamInput in) throws IOException {
partitionValue = in.readString();
maxNormalisedProbability = in.readDouble();
maxNormalizedProbability = in.readDouble();
}
public String getPartitionValue() {
return partitionValue;
}
public double getMaxNormalisedProbability() {
return maxNormalisedProbability;
public double getMaxNormalizedProbability() {
return maxNormalizedProbability;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(partitionValue);
out.writeDouble(maxNormalisedProbability);
out.writeDouble(maxNormalizedProbability);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionValue)
.field(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalisedProbability)
.field(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalizedProbability)
.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(partitionValue, maxNormalisedProbability);
return Objects.hash(partitionValue, maxNormalizedProbability);
}
@Override
@ -268,7 +268,7 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
PartitionProbability that = (PartitionProbability) other;
return Objects.equals(this.partitionValue, that.partitionValue)
&& this.maxNormalisedProbability == that.maxNormalisedProbability;
&& this.maxNormalizedProbability == that.maxNormalizedProbability;
}
}
}

View File

@ -93,8 +93,8 @@ job.config.overField.needs.another = over_field_name must be used in conjunction
job.config.overlapping.buckets.incompatible.function = Overlapping buckets cannot be used with function ''{0}''
job.config.multiple.bucketspans.require.bucket_span = Multiple bucket_spans require a bucket_span to be specified
job.config.multiple.bucketspans.must.be.multiple = Multiple bucket_span ''{0}'' must be a multiple of the main bucket_span ''{1}''
job.config.per.partition.normalisation.requires.partition.field = If the job is configured with Per-Partition Normalization enabled a detector must have a partition field
job.config.per.partition.normalisation.cannot.use.influencers = A job configured with Per-Partition Normalization cannot use influencers
job.config.per.partition.normalization.requires.partition.field = If the job is configured with Per-Partition Normalization enabled a detector must have a partition field
job.config.per.partition.normalization.cannot.use.influencers = A job configured with Per-Partition Normalization cannot use influencers
job.config.update.analysis.limits.parse.error = JSON parse error reading the update value for analysis_limits
job.config.update.analysis.limits.cannot.be.null = Invalid update value for analysis_limits: null

View File

@ -130,12 +130,12 @@ public class ProcessCtrlTests extends ESTestCase {
assertTrue(command.contains("--ignoreDowntime"));
}
public void testBuildNormaliserCommand() throws IOException {
public void testBuildNormalizerCommand() throws IOException {
Environment env = new Environment(
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());
String jobId = "unit-test-job";
List<String> command = ProcessCtrl.buildNormaliserCommand(env, jobId, null, 300, true, pid);
List<String> command = ProcessCtrl.buildNormalizerCommand(env, jobId, null, 300, true, pid);
assertEquals(5, command.size());
assertTrue(command.contains(ProcessCtrl.NORMALIZE_PATH));
assertTrue(command.contains(ProcessCtrl.BUCKET_SPAN_ARG + "300"));

View File

@ -9,7 +9,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
@ -52,21 +52,21 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
when(parser.parseResults(any())).thenReturn(stream);
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, parser);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, parser);
processor.process(JOB_ID, mock(InputStream.class), randomBoolean());
verify(renormaliser, times(1)).shutdown();
verify(renormalizer, times(1)).shutdown();
assertEquals(0, processor.completionLatch.getCount());
}
public void testProcessResult_bucket() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -83,12 +83,12 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_bucket_deleteInterimRequired() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
AutodetectResult result = mock(AutodetectResult.class);
@ -105,11 +105,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_records() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false, bulkBuilder);
context.deleteInterimRequired = false;
@ -125,11 +125,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_records_isPerPartitionNormalization() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", true, bulkBuilder);
context.deleteInterimRequired = false;
@ -148,11 +148,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_influencers() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -168,10 +168,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_categoryDefinition() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -185,11 +185,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_flushAcknowledgement() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -206,11 +206,11 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null, flushListener);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null, flushListener);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -232,10 +232,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_modelDebugOutput() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -249,10 +249,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_modelSizeStats() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -267,10 +267,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_modelSnapshot() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -284,10 +284,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
}
public void testProcessResult_quantiles() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
context.deleteInterimRequired = false;
@ -297,16 +297,16 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
verify(renormaliser, times(1)).renormalise(quantiles);
verify(renormalizer, times(1)).renormalize(quantiles);
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormaliser);
verifyNoMoreInteractions(renormalizer);
}
public void testProcessResult_quantiles_isPerPartitionNormalization() {
Renormaliser renormaliser = mock(Renormaliser.class);
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, true, bulkBuilder);
context.deleteInterimRequired = false;
@ -316,9 +316,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
verify(renormaliser, times(1)).renormaliseWithPartition(quantiles);
verify(renormalizer, times(1)).renormalizeWithPartition(quantiles);
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormaliser);
verifyNoMoreInteractions(renormalizer);
}
}

View File

@ -34,7 +34,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
+ "\"max_normalized_probability\":0, \"anomaly_score\":0,\"record_count\":0,\"event_count\":806,\"bucket_influencers\":["
+ "{\"sequence_num\":1,\"timestamp\":1359450000000,\"bucket_span\":22,\"job_id\":\"foo\",\"anomaly_score\":0,"
+ "\"probability\":0.0, \"influencer_field_name\":\"bucket_time\","
+ "\"initial_anomaly_score\":0.0}]}},{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normaliser 1.1, normaliser 2" +
+ "\"initial_anomaly_score\":0.0}]}},{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normalizer 1.1, normalizer 2" +
".1]\"}}"
+ ",{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":22,\"records\":"
+ "[{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":1,\"job_id\":\"foo\",\"probability\":0.0637541,"
@ -58,8 +58,8 @@ public class AutodetectResultsParserTests extends ESTestCase {
+ "\"sequence_num\":6,\"job_id\":\"foo\",\"raw_anomaly_score\":0.005, \"probability\":0.03,"
+ "\"influencer_field_name\":\"foo\",\"initial_anomaly_score\":10.5,\"anomaly_score\":10.5}]}},{\"quantiles\": "
+ "{\"job_id\":\"foo\","
+ "\"quantile_state\":\"[normaliser 1.2, normaliser 2.2]\"}} ,{\"flush\": {\"id\":\"testing1\"}} ,"
+ "{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normaliser 1.3, normaliser 2.3]\"}} ]";
+ "\"quantile_state\":\"[normalizer 1.2, normalizer 2.2]\"}} ,{\"flush\": {\"id\":\"testing1\"}} ,"
+ "{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normalizer 1.3, normalizer 2.3]\"}} ]";
public static final String POPULATION_OUTPUT_SAMPLE = "[{\"timestamp\":1379590200,\"records\":[{\"probability\":1.38951e-08,"
+ "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\","
@ -315,13 +315,13 @@ public class AutodetectResultsParserTests extends ESTestCase {
assertEquals(3, quantiles.size());
assertEquals("foo", quantiles.get(0).getJobId());
assertNull(quantiles.get(0).getTimestamp());
assertEquals("[normaliser 1.1, normaliser 2.1]", quantiles.get(0).getQuantileState());
assertEquals("[normalizer 1.1, normalizer 2.1]", quantiles.get(0).getQuantileState());
assertEquals("foo", quantiles.get(1).getJobId());
assertNull(quantiles.get(1).getTimestamp());
assertEquals("[normaliser 1.2, normaliser 2.2]", quantiles.get(1).getQuantileState());
assertEquals("[normalizer 1.2, normalizer 2.2]", quantiles.get(1).getQuantileState());
assertEquals("foo", quantiles.get(2).getJobId());
assertNull(quantiles.get(2).getTimestamp());
assertEquals("[normaliser 1.3, normaliser 2.3]", quantiles.get(2).getQuantileState());
assertEquals("[normalizer 1.3, normalizer 2.3]", quantiles.get(2).getQuantileState());
}
@AwaitsFix(bugUrl = "rewrite this test so it doesn't use ~200 lines of json")

View File

@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.junit.Before;
import java.util.Date;
public class BucketInfluencerNormalizableTests extends ESTestCase {
private static final double EPSILON = 0.0001;
private BucketInfluencer bucketInfluencer;
@Before
public void setUpBucketInfluencer() {
bucketInfluencer = new BucketInfluencer("foo", new Date(), 600, 1);
bucketInfluencer.setInfluencerFieldName("airline");
bucketInfluencer.setProbability(0.05);
bucketInfluencer.setRawAnomalyScore(3.14);
bucketInfluencer.setInitialAnomalyScore(2.0);
bucketInfluencer.setAnomalyScore(1.0);
}
public void testIsContainerOnly() {
assertFalse(new BucketInfluencerNormalizable(bucketInfluencer).isContainerOnly());
}
public void testGetLevel() {
assertEquals(Level.BUCKET_INFLUENCER, new BucketInfluencerNormalizable(bucketInfluencer).getLevel());
BucketInfluencer timeInfluencer = new BucketInfluencer("foo", new Date(), 600, 1);
timeInfluencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
assertEquals(Level.ROOT, new BucketInfluencerNormalizable(timeInfluencer).getLevel());
}
public void testGetPartitionFieldName() {
assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getPartitionFieldName());
}
public void testGetPersonFieldName() {
assertEquals("airline", new BucketInfluencerNormalizable(bucketInfluencer).getPersonFieldName());
}
public void testGetFunctionName() {
assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getFunctionName());
}
public void testGetValueFieldName() {
assertNull(new BucketInfluencerNormalizable(bucketInfluencer).getValueFieldName());
}
public void testGetProbability() {
assertEquals(0.05, new BucketInfluencerNormalizable(bucketInfluencer).getProbability(), EPSILON);
}
public void testGetNormalizedScore() {
assertEquals(1.0, new BucketInfluencerNormalizable(bucketInfluencer).getNormalizedScore(), EPSILON);
}
public void testSetNormalizedScore() {
BucketInfluencerNormalizable normalizable = new BucketInfluencerNormalizable(bucketInfluencer);
normalizable.setNormalizedScore(99.0);
assertEquals(99.0, normalizable.getNormalizedScore(), EPSILON);
assertEquals(99.0, bucketInfluencer.getAnomalyScore(), EPSILON);
}
public void testGetChildrenTypes() {
assertTrue(new BucketInfluencerNormalizable(bucketInfluencer).getChildrenTypes().isEmpty());
}
public void testGetChildren_ByType() {
expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer).getChildren(0));
}
public void testGetChildren() {
assertTrue(new BucketInfluencerNormalizable(bucketInfluencer).getChildren().isEmpty());
}
public void testSetMaxChildrenScore() {
expectThrows(IllegalStateException.class, () -> new BucketInfluencerNormalizable(bucketInfluencer).setMaxChildrenScore(0, 42.0));
}
public void testSetParentScore() {
new BucketInfluencerNormalizable(bucketInfluencer).setParentScore(42.0);
assertEquals("airline", bucketInfluencer.getInfluencerFieldName());
assertEquals(1.0, bucketInfluencer.getAnomalyScore(), EPSILON);
assertEquals(3.14, bucketInfluencer.getRawAnomalyScore(), EPSILON);
assertEquals(2.0, bucketInfluencer.getInitialAnomalyScore(), EPSILON);
assertEquals(0.05, bucketInfluencer.getProbability(), EPSILON);
}
public void testResetBigChangeFlag() {
new BucketInfluencerNormalizable(bucketInfluencer).resetBigChangeFlag();
}
public void testRaiseBigChangeFlag() {
new BucketInfluencerNormalizable(bucketInfluencer).raiseBigChangeFlag();
}
}

View File

@ -0,0 +1,188 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.PartitionScore;
import org.junit.Before;
public class BucketNormalizableTests extends ESTestCase {
private static final double EPSILON = 0.0001;
private Bucket bucket;
@Before
public void setUpBucket() {
bucket = new Bucket("foo", new Date(), 600);
BucketInfluencer bucketInfluencer1 = new BucketInfluencer("foo", bucket.getTimestamp(), 600, 1);
bucketInfluencer1.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
bucketInfluencer1.setAnomalyScore(42.0);
bucketInfluencer1.setProbability(0.01);
BucketInfluencer bucketInfluencer2 = new BucketInfluencer("foo", bucket.getTimestamp(), 600, 2);
bucketInfluencer2.setInfluencerFieldName("foo");
bucketInfluencer2.setAnomalyScore(88.0);
bucketInfluencer2.setProbability(0.001);
bucket.setBucketInfluencers(Arrays.asList(bucketInfluencer1, bucketInfluencer2));
bucket.setAnomalyScore(88.0);
bucket.setMaxNormalizedProbability(2.0);
AnomalyRecord record1 = new AnomalyRecord("foo", bucket.getTimestamp(), 600, 3);
record1.setNormalizedProbability(1.0);
AnomalyRecord record2 = new AnomalyRecord("foo", bucket.getTimestamp(), 600, 4);
record2.setNormalizedProbability(2.0);
bucket.setRecords(Arrays.asList(record1, record2));
List<PartitionScore> partitionScores = new ArrayList<>();
partitionScores.add(new PartitionScore("pf1", "pv1", 0.2, 0.1));
partitionScores.add(new PartitionScore("pf1", "pv2", 0.4, 0.01));
bucket.setPartitionScores(partitionScores);
}
public void testIsContainerOnly() {
assertTrue(new BucketNormalizable(bucket).isContainerOnly());
}
public void testGetLevel() {
assertEquals(Level.ROOT, new BucketNormalizable(bucket).getLevel());
}
public void testGetPartitionFieldName() {
assertNull(new BucketNormalizable(bucket).getPartitionFieldName());
}
public void testGetPartitionFieldValue() {
assertNull(new BucketNormalizable(bucket).getPartitionFieldValue());
}
public void testGetPersonFieldName() {
assertNull(new BucketNormalizable(bucket).getPersonFieldName());
}
public void testGetFunctionName() {
assertNull(new BucketNormalizable(bucket).getFunctionName());
}
public void testGetValueFieldName() {
assertNull(new BucketNormalizable(bucket).getValueFieldName());
}
public void testGetProbability() {
expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket).getProbability());
}
public void testGetNormalizedScore() {
assertEquals(88.0, new BucketNormalizable(bucket).getNormalizedScore(), EPSILON);
}
public void testSetNormalizedScore() {
BucketNormalizable normalizable = new BucketNormalizable(bucket);
normalizable.setNormalizedScore(99.0);
assertEquals(99.0, normalizable.getNormalizedScore(), EPSILON);
assertEquals(99.0, bucket.getAnomalyScore(), EPSILON);
}
public void testGetChildren() {
List<Normalizable> children = new BucketNormalizable(bucket).getChildren();
assertEquals(6, children.size());
assertTrue(children.get(0) instanceof BucketInfluencerNormalizable);
assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON);
assertTrue(children.get(1) instanceof BucketInfluencerNormalizable);
assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON);
assertTrue(children.get(2) instanceof RecordNormalizable);
assertEquals(1.0, children.get(2).getNormalizedScore(), EPSILON);
assertTrue(children.get(3) instanceof RecordNormalizable);
assertEquals(2.0, children.get(3).getNormalizedScore(), EPSILON);
assertTrue(children.get(4) instanceof PartitionScoreNormalizable);
assertEquals(0.2, children.get(4).getNormalizedScore(), EPSILON);
assertTrue(children.get(5) instanceof PartitionScoreNormalizable);
assertEquals(0.4, children.get(5).getNormalizedScore(), EPSILON);
}
public void testGetChildren_GivenTypeBucketInfluencer() {
List<Normalizable> children = new BucketNormalizable(bucket).getChildren(0);
assertEquals(2, children.size());
assertTrue(children.get(0) instanceof BucketInfluencerNormalizable);
assertEquals(42.0, children.get(0).getNormalizedScore(), EPSILON);
assertTrue(children.get(1) instanceof BucketInfluencerNormalizable);
assertEquals(88.0, children.get(1).getNormalizedScore(), EPSILON);
}
public void testGetChildren_GivenTypeRecord() {
List<Normalizable> children = new BucketNormalizable(bucket).getChildren(1);
assertEquals(2, children.size());
assertTrue(children.get(0) instanceof RecordNormalizable);
assertEquals(1.0, children.get(0).getNormalizedScore(), EPSILON);
assertTrue(children.get(1) instanceof RecordNormalizable);
assertEquals(2.0, children.get(1).getNormalizedScore(), EPSILON);
}
public void testGetChildren_GivenInvalidType() {
expectThrows(IllegalArgumentException.class, () -> new BucketNormalizable(bucket).getChildren(3));
}
public void testSetMaxChildrenScore_GivenDifferentScores() {
BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket);
assertTrue(bucketNormalizable.setMaxChildrenScore(0, 95.0));
assertTrue(bucketNormalizable.setMaxChildrenScore(1, 42.0));
assertEquals(95.0, bucket.getAnomalyScore(), EPSILON);
assertEquals(42.0, bucket.getMaxNormalizedProbability(), EPSILON);
}
public void testSetMaxChildrenScore_GivenSameScores() {
BucketNormalizable bucketNormalizable = new BucketNormalizable(bucket);
assertFalse(bucketNormalizable.setMaxChildrenScore(0, 88.0));
assertFalse(bucketNormalizable.setMaxChildrenScore(1, 2.0));
assertEquals(88.0, bucket.getAnomalyScore(), EPSILON);
assertEquals(2.0, bucket.getMaxNormalizedProbability(), EPSILON);
}
public void testSetMaxChildrenScore_GivenInvalidType() {
expectThrows(IllegalArgumentException.class, () -> new BucketNormalizable(bucket).setMaxChildrenScore(3, 95.0));
}
public void testSetParentScore() {
expectThrows(IllegalStateException.class, () -> new BucketNormalizable(bucket).setParentScore(42.0));
}
public void testResetBigChangeFlag() {
BucketNormalizable normalizable = new BucketNormalizable(bucket);
normalizable.raiseBigChangeFlag();
normalizable.resetBigChangeFlag();
assertFalse(bucket.hadBigNormalizedUpdate());
}
public void testRaiseBigChangeFlag() {
BucketNormalizable normalizable = new BucketNormalizable(bucket);
normalizable.resetBigChangeFlag();
normalizable.raiseBigChangeFlag();
assertTrue(bucket.hadBigNormalizedUpdate());
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.junit.Before;
import java.util.Date;
public class InfluencerNormalizableTests extends ESTestCase {
private static final double EPSILON = 0.0001;
private Influencer influencer;
@Before
public void setUpInfluencer() {
influencer = new Influencer("foo", "airline", "AAL", new Date(), 600, 1);
influencer.setAnomalyScore(1.0);
influencer.setInitialAnomalyScore(2.0);
influencer.setProbability(0.05);
}
public void testIsContainerOnly() {
assertFalse(new InfluencerNormalizable(influencer).isContainerOnly());
}
public void testGetLevel() {
assertEquals(Level.INFLUENCER, new InfluencerNormalizable(influencer).getLevel());
}
public void testGetPartitionFieldName() {
assertNull(new InfluencerNormalizable(influencer).getPartitionFieldName());
}
public void testGetPartitionFieldValue() {
assertNull(new InfluencerNormalizable(influencer).getPartitionFieldValue());
}
public void testGetPersonFieldName() {
assertEquals("airline", new InfluencerNormalizable(influencer).getPersonFieldName());
}
public void testGetFunctionName() {
assertNull(new InfluencerNormalizable(influencer).getFunctionName());
}
public void testGetValueFieldName() {
assertNull(new InfluencerNormalizable(influencer).getValueFieldName());
}
public void testGetProbability() {
assertEquals(0.05, new InfluencerNormalizable(influencer).getProbability(), EPSILON);
}
public void testGetNormalizedScore() {
assertEquals(1.0, new InfluencerNormalizable(influencer).getNormalizedScore(), EPSILON);
}
public void testSetNormalizedScore() {
InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer);
normalizable.setNormalizedScore(99.0);
assertEquals(99.0, normalizable.getNormalizedScore(), EPSILON);
assertEquals(99.0, influencer.getAnomalyScore(), EPSILON);
}
public void testGetChildrenTypes() {
assertTrue(new InfluencerNormalizable(influencer).getChildrenTypes().isEmpty());
}
public void testGetChildren_ByType() {
expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).getChildren(0));
}
public void testGetChildren() {
assertTrue(new InfluencerNormalizable(influencer).getChildren().isEmpty());
}
public void testSetMaxChildrenScore() {
expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).setMaxChildrenScore(0, 42.0));
}
public void testSetParentScore() {
expectThrows(IllegalStateException.class, () -> new InfluencerNormalizable(influencer).setParentScore(42.0));
}
public void testResetBigChangeFlag() {
InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer);
normalizable.raiseBigChangeFlag();
normalizable.resetBigChangeFlag();
assertFalse(influencer.hadBigNormalizedUpdate());
}
public void testRaiseBigChangeFlag() {
InfluencerNormalizable normalizable = new InfluencerNormalizable(influencer);
normalizable.resetBigChangeFlag();
normalizable.raiseBigChangeFlag();
assertTrue(influencer.hadBigNormalizedUpdate());
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
public class NormalizerResultTests extends AbstractSerializingTestCase<NormalizerResult> {
private static final double EPSILON = 0.0000000001;
public void testDefaultConstructor() {
NormalizerResult msg = new NormalizerResult();
assertNull(msg.getLevel());
assertNull(msg.getPartitionFieldName());
assertNull(msg.getPartitionFieldValue());
assertNull(msg.getPersonFieldName());
assertNull(msg.getFunctionName());
assertNull(msg.getValueFieldName());
assertEquals(0.0, msg.getProbability(), EPSILON);
assertEquals(0.0, msg.getNormalizedScore(), EPSILON);
}
@Override
protected NormalizerResult createTestInstance() {
NormalizerResult msg = new NormalizerResult();
msg.setLevel("leaf");
msg.setPartitionFieldName("part");
msg.setPartitionFieldValue("something");
msg.setPersonFieldName("person");
msg.setFunctionName("mean");
msg.setValueFieldName("value");
msg.setProbability(0.005);
msg.setNormalizedScore(98.7);
return msg;
}
@Override
protected Reader<NormalizerResult> instanceReader() {
return NormalizerResult::new;
}
@Override
protected NormalizerResult parseInstance(XContentParser parser, ParseFieldMatcher matcher) {
return NormalizerResult.PARSER.apply(parser, () -> matcher);
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Date;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class NormalizerTests extends ESTestCase {
private static final String JOB_ID = "foo";
private static final String QUANTILES_STATE = "someState";
private static final int BUCKET_SPAN = 600;
private static final double INITIAL_SCORE = 2.0;
private static final double FACTOR = 2.0;
private Bucket generateBucket(Date timestamp) throws IOException {
return new Bucket(JOB_ID, timestamp, BUCKET_SPAN);
}
private BucketInfluencer createTimeBucketInfluencer(Date timestamp, double probability, double anomalyScore) {
BucketInfluencer influencer = new BucketInfluencer(JOB_ID, timestamp, BUCKET_SPAN, 1);
influencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
influencer.setProbability(probability);
influencer.setInitialAnomalyScore(anomalyScore);
influencer.setAnomalyScore(anomalyScore);
return influencer;
}
public void testNormalize() throws IOException {
ExecutorService threadpool = Executors.newScheduledThreadPool(1);
NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class);
when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), eq(false),
any())).thenReturn(new MultiplyingNormalizerProcess(Settings.EMPTY, FACTOR));
Normalizer normalizer = new Normalizer(JOB_ID, processFactory, threadpool);
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(0.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE));
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
List<Normalizable> asNormalizables = buckets.stream()
.map(b -> new BucketNormalizable(b)).collect(Collectors.toList());
normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE);
threadpool.shutdown();
assertEquals(1, asNormalizables.size());
assertEquals(FACTOR * INITIAL_SCORE, asNormalizables.get(0).getNormalizedScore(), 0.0001);
}
}

View File

@ -0,0 +1,448 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Deque;
import java.util.List;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizer;
import org.elasticsearch.xpack.prelert.job.persistence.MockBatchedDocumentsIterator;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.junit.Before;
import org.mockito.MockitoAnnotations;
public class ScoresUpdaterTests extends ESTestCase {
private static final String JOB_ID = "foo";
private static final String QUANTILES_STATE = "someState";
private static final long DEFAULT_BUCKET_SPAN = 3600;
private static final long DEFAULT_START_TIME = 0;
private static final long DEFAULT_END_TIME = 3600;
private JobProvider jobProvider = mock(JobProvider.class);
private JobRenormalizer jobRenormalizer = mock(JobRenormalizer.class);
private Normalizer normalizer = mock(Normalizer.class);
private NormalizerFactory normalizerFactory = mock(NormalizerFactory.class);
private Job job;
private ScoresUpdater scoresUpdater;
private Bucket generateBucket(Date timestamp) throws IOException {
return new Bucket(JOB_ID, timestamp, DEFAULT_BUCKET_SPAN);
}
@Before
public void setUpMocks() throws IOException {
MockitoAnnotations.initMocks(this);
Job.Builder jobBuilder = new Job.Builder(JOB_ID);
jobBuilder.setRenormalizationWindowDays(1L);
List<Detector> detectors = new ArrayList<>();
detectors.add(mock(Detector.class));
AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(detectors);
configBuilder.setBucketSpan(DEFAULT_BUCKET_SPAN);
jobBuilder.setAnalysisConfig(configBuilder);
job = jobBuilder.build();
scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizer, normalizerFactory);
givenProviderReturnsNoBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME);
givenProviderReturnsNoInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME);
givenNormalizerFactoryReturnsMock();
}
public void testUpdate_GivenBucketWithZeroScoreAndNoRecords() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(0.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.7, 0.0));
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(0);
verifyBucketWasNotUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testUpdate_GivenBucketWithNonZeroScoreButNoBucketInfluencers() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(0.0);
bucket.setBucketInfluencers(null);
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(0);
verifyBucketWasNotUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testUpdate_GivenSingleBucketWithoutBigChangeAndNoRecords() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(30.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0));
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasNotUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testUpdate_GivenSingleBucketWithoutBigChangeAndRecordsWithoutBigChange() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(30.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 30.0));
List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord record1 = createRecordWithoutBigChange();
AnomalyRecord record2 = createRecordWithoutBigChange();
records.add(record1);
records.add(record2);
bucket.setRecords(records);
bucket.setRecordCount(2);
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasNotUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testUpdate_GivenSingleBucketWithBigChangeAndNoRecords() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testUpdate_GivenSingleBucketWithoutBigChangeAndSomeRecordsWithBigChange() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord record1 = createRecordWithBigChange();
AnomalyRecord record2 = createRecordWithoutBigChange();
AnomalyRecord record3 = createRecordWithBigChange();
records.add(record1);
records.add(record2);
records.add(record3);
bucket.setRecords(records);
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasNotUpdated(bucket);
verifyRecordsWereUpdated(bucket.getId(), Arrays.asList(record1, record3));
}
public void testUpdate_GivenSingleBucketWithBigChangeAndSomeRecordsWithBigChange() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
List<AnomalyRecord> records = new ArrayList<>();
AnomalyRecord record1 = createRecordWithBigChange();
AnomalyRecord record2 = createRecordWithoutBigChange();
AnomalyRecord record3 = createRecordWithBigChange();
records.add(record1);
records.add(record2);
records.add(record3);
bucket.setRecords(records);
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, buckets);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket);
verifyRecordsWereUpdated(bucket.getId(), Arrays.asList(record1, record3));
}
public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException {
Deque<Bucket> batch1 = new ArrayDeque<>();
for (int i = 0; i < 10000; ++i) {
Bucket bucket = generateBucket(new Date(i * 1000));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
batch1.add(bucket);
}
Bucket secondBatchBucket = generateBucket(new Date(10000 * 1000));
secondBatchBucket.addBucketInfluencer(createTimeBucketInfluencer(secondBatchBucket.getTimestamp(), 0.04, 42.0));
secondBatchBucket.setAnomalyScore(42.0);
secondBatchBucket.setMaxNormalizedProbability(50.0);
secondBatchBucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> batch2 = new ArrayDeque<>();
batch2.add(secondBatchBucket);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, batch1, batch2);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
// Batch 1 - Just verify first and last were updated as Mockito
// is forbiddingly slow when tring to verify all 10000
verifyBucketWasUpdated(batch1.getFirst());
verifyBucketRecordsWereNotUpdated(batch1.getFirst().getId());
verifyBucketWasUpdated(batch1.getLast());
verifyBucketRecordsWereNotUpdated(batch1.getLast().getId());
verifyBucketWasUpdated(secondBatchBucket);
verifyBucketRecordsWereNotUpdated(secondBatchBucket.getId());
}
public void testUpdate_GivenTwoBucketsWithFirstHavingEnoughRecordsToForceSecondNormalization() throws IOException {
Bucket bucket1 = generateBucket(new Date(0));
bucket1.setAnomalyScore(42.0);
bucket1.addBucketInfluencer(createTimeBucketInfluencer(bucket1.getTimestamp(), 0.04, 42.0));
bucket1.setMaxNormalizedProbability(50.0);
bucket1.raiseBigNormalizedUpdateFlag();
when(jobProvider.expandBucket(JOB_ID, false, bucket1)).thenReturn(100000);
Bucket bucket2 = generateBucket(new Date(10000 * 1000));
bucket2.addBucketInfluencer(createTimeBucketInfluencer(bucket2.getTimestamp(), 0.04, 42.0));
bucket2.setAnomalyScore(42.0);
bucket2.setMaxNormalizedProbability(50.0);
bucket2.raiseBigNormalizedUpdateFlag();
Deque<Bucket> batch = new ArrayDeque<>();
batch.add(bucket1);
batch.add(bucket2);
givenProviderReturnsBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME, batch);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(2);
verifyBucketWasUpdated(bucket1);
verifyBucketRecordsWereNotUpdated(bucket1.getId());
verifyBucketWasUpdated(bucket2);
verifyBucketRecordsWereNotUpdated(bucket2.getId());
}
public void testUpdate_GivenInfluencerWithBigChange() throws IOException {
Influencer influencer = new Influencer(JOB_ID, "n", "v", new Date(DEFAULT_START_TIME), 600, 1);
influencer.raiseBigNormalizedUpdateFlag();
Deque<Influencer> influencers = new ArrayDeque<>();
influencers.add(influencer);
givenProviderReturnsInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME, influencers);
scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
verifyNormalizerWasInvoked(1);
verifyInfluencerWasUpdated(influencer);
}
public void testDefaultRenormalizationWindowBasedOnTime() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(2509200000L, 2595600000L, buckets);
givenProviderReturnsNoInfluencers(2509200000L, 2595600000L);
scoresUpdater.update(QUANTILES_STATE, 2595600000L, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testManualRenormalizationWindow() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(3600000, 90000000L, buckets);
givenProviderReturnsNoInfluencers(3600000, 90000000L);
scoresUpdater.update(QUANTILES_STATE, 90000000L, 0, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
public void testManualRenormalizationWindow_GivenExtension() throws IOException {
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(42.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.04, 42.0));
bucket.setMaxNormalizedProbability(50.0);
bucket.raiseBigNormalizedUpdateFlag();
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
givenProviderReturnsBuckets(2700000, 90000000L, buckets);
givenProviderReturnsNoInfluencers(2700000, 90000000L);
scoresUpdater.update(QUANTILES_STATE, 90000000L, 900000, false);
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket);
verifyBucketRecordsWereNotUpdated(bucket.getId());
}
private void verifyNormalizerWasInvoked(int times) throws IOException {
int bucketSpan = job.getAnalysisConfig() == null ? 0
: job.getAnalysisConfig().getBucketSpan().intValue();
verify(normalizer, times(times)).normalize(
eq(bucketSpan), eq(false), anyListOf(Normalizable.class),
eq(QUANTILES_STATE));
}
private BucketInfluencer createTimeBucketInfluencer(Date timestamp, double probability, double anomalyScore) {
BucketInfluencer influencer = new BucketInfluencer(JOB_ID, timestamp, DEFAULT_BUCKET_SPAN, 1);
influencer.setProbability(probability);
influencer.setAnomalyScore(anomalyScore);
influencer.setInfluencerFieldName(BucketInfluencer.BUCKET_TIME);
return influencer;
}
private void givenNormalizerFactoryReturnsMock() {
when(normalizerFactory.create(JOB_ID)).thenReturn(normalizer);
}
private void givenProviderReturnsNoBuckets(long startTime, long endTime) {
givenBuckets(startTime, endTime, Collections.emptyList());
}
private void givenProviderReturnsBuckets(long startTime, long endTime, Deque<Bucket> batch1, Deque<Bucket> batch2) {
List<Deque<Bucket>> batches = new ArrayList<>();
batches.add(new ArrayDeque<>(batch1));
batches.add(new ArrayDeque<>(batch2));
givenBuckets(startTime, endTime, batches);
}
private void givenProviderReturnsBuckets(long startTime, long endTime, Deque<Bucket> buckets) {
List<Deque<Bucket>> batches = new ArrayList<>();
batches.add(new ArrayDeque<>(buckets));
givenBuckets(startTime, endTime, batches);
}
private void givenBuckets(long startTime, long endTime, List<Deque<Bucket>> batches) {
BatchedDocumentsIterator<Bucket> iterator = new MockBatchedDocumentsIterator<>(startTime,
endTime, batches);
when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(iterator);
}
private void givenProviderReturnsNoInfluencers(long startTime, long endTime) {
givenProviderReturnsInfluencers(startTime, endTime, new ArrayDeque<>());
}
private void givenProviderReturnsInfluencers(long startTime, long endTime,
Deque<Influencer> influencers) {
List<Deque<Influencer>> batches = new ArrayList<>();
batches.add(new ArrayDeque<>(influencers));
BatchedDocumentsIterator<Influencer> iterator = new MockBatchedDocumentsIterator<>(
startTime, endTime, batches);
when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator);
}
private void verifyBucketWasUpdated(Bucket bucket) {
verify(jobRenormalizer).updateBucket(bucket);
}
private void verifyRecordsWereUpdated(String bucketId, List<AnomalyRecord> records) {
verify(jobRenormalizer).updateRecords(bucketId, records);
}
private void verifyBucketWasNotUpdated(Bucket bucket) {
verify(jobRenormalizer, never()).updateBucket(bucket);
}
private void verifyBucketRecordsWereNotUpdated(String bucketId) {
verify(jobRenormalizer, never()).updateRecords(eq(bucketId),
anyListOf(AnomalyRecord.class));
}
private static AnomalyRecord createRecordWithoutBigChange() {
return createRecord(false);
}
private static AnomalyRecord createRecordWithBigChange() {
return createRecord(true);
}
private static AnomalyRecord createRecord(boolean hadBigChange) {
AnomalyRecord anomalyRecord = mock(AnomalyRecord.class);
when(anomalyRecord.hadBigNormalizedUpdate()).thenReturn(hadBigChange);
when(anomalyRecord.getId()).thenReturn("someId");
return anomalyRecord;
}
private void verifyInfluencerWasUpdated(Influencer influencer) {
List<Influencer> list = new ArrayList<>();
list.add(influencer);
verify(jobRenormalizer).updateInfluencer(eq(JOB_ID), eq(list));
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer.output;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerResult;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class NormalizerResultHandlerTests extends ESTestCase {
private static final double EPSILON = 0.0000001;
public void testParse() throws IOException {
String testData = "{\"level\":\"leaf\",\"partition_field_name\":\"part\",\"partition_field_value\":\"v1\","
+ "\"person_field_name\":\"pers\",\"function_name\":\"f\","
+ "\"value_field_name\":\"x\",\"probability\":0.01,\"normalized_score\":88.88}\n"
+ "{\"level\":\"leaf\",\"partition_field_name\":\"part\",\"partition_field_value\":\"v2\","
+ "\"person_field_name\":\"pers\",\"function_name\":\"f\","
+ "\"value_field_name\":\"x\",\"probability\":0.02,\"normalized_score\":44.44}\n"
+ "{\"level\":\"leaf\",\"partition_field_name\":\"part\",\"partition_field_value\":\"v3\","
+ "\"person_field_name\":\"pers\",\"function_name\":\"f\","
+ "\"value_field_name\":\"x\",\"probability\":0.03,\"normalized_score\":22.22}\n";
InputStream is = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
NormalizerResultHandler handler = new NormalizerResultHandler(Settings.EMPTY, is);
handler.process();
List<NormalizerResult> results = handler.getNormalizedResults();
assertEquals(3, results.size());
assertEquals(88.88, results.get(0).getNormalizedScore(), EPSILON);
assertEquals(44.44, results.get(1).getNormalizedScore(), EPSILON);
assertEquals(22.22, results.get(2).getNormalizedScore(), EPSILON);
}
}

View File

@ -255,56 +255,56 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
assertEquals(bucket1.hashCode(), bucket2.hashCode());
}
public void testIsNormalisable_GivenNullBucketInfluencers() {
public void testIsNormalizable_GivenNullBucketInfluencers() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setBucketInfluencers(null);
bucket.setAnomalyScore(90.0);
assertFalse(bucket.isNormalisable());
assertFalse(bucket.isNormalizable());
}
public void testIsNormalisable_GivenEmptyBucketInfluencers() {
public void testIsNormalizable_GivenEmptyBucketInfluencers() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setBucketInfluencers(Collections.emptyList());
bucket.setAnomalyScore(90.0);
assertFalse(bucket.isNormalisable());
assertFalse(bucket.isNormalizable());
}
public void testIsNormalisable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() {
public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(0.0);
bucket.setRecordCount(0);
assertFalse(bucket.isNormalisable());
assertFalse(bucket.isNormalizable());
}
public void testIsNormalisable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() {
public void testIsNormalizable_GivenAnomalyScoreIsZeroAndRecordCountIsNonZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(0.0);
bucket.setRecordCount(1);
assertTrue(bucket.isNormalisable());
assertTrue(bucket.isNormalizable());
}
public void testIsNormalisable_GivenAnomalyScoreIsNonZeroAndRecordCountIsZero() {
public void testIsNormalizable_GivenAnomalyScoreIsNonZeroAndRecordCountIsZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(1.0);
bucket.setRecordCount(0);
assertTrue(bucket.isNormalisable());
assertTrue(bucket.isNormalizable());
}
public void testIsNormalisable_GivenAnomalyScoreIsNonZeroAndRecordCountIsNonZero() {
public void testIsNormalizable_GivenAnomalyScoreIsNonZeroAndRecordCountIsNonZero() {
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.addBucketInfluencer(new BucketInfluencer("foo", new Date(123), 123, 1));
bucket.setAnomalyScore(1.0);
bucket.setRecordCount(1);
assertTrue(bucket.isNormalisable());
assertTrue(bucket.isNormalizable());
}
public void testPartitionAnomalyScore() {

View File

@ -53,10 +53,9 @@ public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCa
assertEquals(2, pProbs.size());
for (PerPartitionMaxProbabilities.PartitionProbability pProb : pProbs) {
if (pProb.getPartitionValue().equals("A")) {
assertEquals(40.0, pProb.getMaxNormalisedProbability(), 0.0001);
}
else {
assertEquals(90.0, pProb.getMaxNormalisedProbability(), 0.0001);
assertEquals(40.0, pProb.getMaxNormalizedProbability(), 0.0001);
} else {
assertEquals(90.0, pProb.getMaxNormalizedProbability(), 0.0001);
}
}
}
@ -81,4 +80,4 @@ public class PerPartitionMaxProbabilitiesTests extends AbstractSerializingTestCa
record.setNormalizedProbability(normalizedProbability);
return record;
}
}
}