Add back native process normalizer functionality (elastic/elasticsearch#540)

This builds on PR elastic/elasticsearch#526 to get normalization working end-to-end using the
native normalizer process.

The ShortCircuitingRenormalizer class is basically doing what the old
BlockingQueueRenormaliser class did but using the ES threadpool instead
of its own thread.

Also fixed a bug where the C++ was calling the score field of partition_score
documents normalized_probability but the Java was calling it anomaly_score.

Original commit: elastic/x-pack-elasticsearch@d4cecae150
This commit is contained in:
David Roberts 2016-12-14 16:06:01 +00:00 committed by GitHub
parent 4732223214
commit 0a45d846df
30 changed files with 402 additions and 215 deletions

View File

@ -63,6 +63,7 @@ import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.process.NativeController;
import org.elasticsearch.xpack.prelert.job.process.ProcessCtrl;
@ -70,6 +71,7 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessF
import org.elasticsearch.xpack.prelert.job.process.autodetect.BlackHoleAutodetectProcess;
import org.elasticsearch.xpack.prelert.job.process.autodetect.NativeAutodetectProcessFactory;
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.prelert.job.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NativeNormalizerProcessFactory;
import org.elasticsearch.xpack.prelert.job.process.normalizer.MultiplyingNormalizerProcess;
@ -157,6 +159,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);
JobProvider jobProvider = new JobProvider(client, 0, parseFieldMatcherSupplier.getParseFieldMatcher());
JobRenormalizedResultsPersister jobRenormalizedResultsPersister = new JobRenormalizedResultsPersister(settings,
jobResultsPersister);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService);
@ -177,10 +181,12 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, perPartitionNormalization,
executorService) -> new MultiplyingNormalizerProcess(settings, 1.0);
}
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(PrelertPlugin.THREAD_POOL_NAME));
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, autodetectProcessFactory,
clusterService.getClusterSettings());
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, autodetectResultsParser,
autodetectProcessFactory, normalizerFactory, clusterService.getClusterSettings());
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client),
@ -266,15 +272,11 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
return env.configFile().resolve(NAME).resolve(name);
}
public static Path resolveLogFile(Environment env, String name) {
return env.logsFile().resolve(NAME).resolve(name);
}
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
FixedExecutorBuilder prelert = new FixedExecutorBuilder(settings, THREAD_POOL_NAME,
maxNumberOfJobs, 1000, "xpack.prelert.thread_pool");
maxNumberOfJobs * 2, 1000, "xpack.prelert.thread_pool");
// fail quick to run autodetect process / scheduler, so no queues
// 4 threads: for c++ logging, result processing, state processing and restore state

View File

@ -24,6 +24,7 @@ import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.UsagePersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
@ -35,7 +36,10 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectR
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.prelert.job.process.normalizer.noop.NoOpRenormalizer;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.prelert.job.process.normalizer.ScoresUpdater;
import org.elasticsearch.xpack.prelert.job.process.normalizer.ShortCircuitingRenormalizer;
import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -64,10 +68,12 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
private final JobProvider jobProvider;
private final AutodetectResultsParser parser;
private final AutodetectProcessFactory autodetectProcessFactory;
private final NormalizerFactory normalizerFactory;
private final UsagePersister usagePersister;
private final StateProcessor stateProcessor;
private final JobResultsPersister jobResultsPersister;
private final JobRenormalizedResultsPersister jobRenormalizedResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister;
private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob;
@ -76,18 +82,22 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager,
JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobRenormalizedResultsPersister jobRenormalizedResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory, ClusterSettings clusterSettings) {
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
ClusterSettings clusterSettings) {
super(settings);
this.client = client;
this.threadPool = threadPool;
this.maxAllowedRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
this.parser = parser;
this.autodetectProcessFactory = autodetectProcessFactory;
this.normalizerFactory = normalizerFactory;
this.jobManager = jobManager;
this.jobProvider = jobProvider;
this.jobResultsPersister = jobResultsPersister;
this.jobRenormalizedResultsPersister = jobRenormalizedResultsPersister;
this.stateProcessor = new StateProcessor(settings, jobResultsPersister);
this.usagePersister = new UsagePersister(settings, client);
this.jobDataCountsPersister = jobDataCountsPersister;
@ -175,12 +185,14 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
UsageReporter usageReporter = new UsageReporter(settings, job.getId(), usagePersister);
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getId(), jobProvider.dataCounts(jobId),
usageReporter, jobDataCountsPersister)) {
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormalizer(), jobResultsPersister, parser);
ScoresUpdater scoresUpdator = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdator,
threadPool.executor(PrelertPlugin.THREAD_POOL_NAME), job.getAnalysisConfig().getUsePerPartitionNormalization());
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, jobResultsPersister, parser);
AutodetectProcess process = null;
try {
process = autodetectProcessFactory.createAutodetectProcess(job, ignoreDowntime, executorService);
// TODO Port the normalizer from the old project
return new AutodetectCommunicator(executorService, job, process, statusReporter, processor, stateProcessor);
} catch (Exception e) {
try {

View File

@ -153,6 +153,9 @@ public class ElasticsearchMappings {
.startObject(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(AnomalyRecord.ANOMALY_SCORE.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
@ -440,6 +443,9 @@ public class ElasticsearchMappings {
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(Quantiles.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()

View File

@ -20,14 +20,13 @@ import java.util.List;
* for a particular job with new normalized anomaly scores and
* unusual scores.
*
* Renormalized results already have an ID having been indexed at least
* once before that same ID should be used on persistence
* Renormalized results must already have an ID.
*/
public class JobRenormalizer extends AbstractComponent {
public class JobRenormalizedResultsPersister extends AbstractComponent {
private final JobResultsPersister jobResultsPersister;
public JobRenormalizer(Settings settings, JobResultsPersister jobResultsPersister) {
public JobRenormalizedResultsPersister(Settings settings, JobResultsPersister jobResultsPersister) {
super(settings);
this.jobResultsPersister = jobResultsPersister;
}

View File

@ -72,7 +72,7 @@ public class NativeController {
}
synchronized (commandStream) {
LOGGER.info("Starting process with command: " + command);
LOGGER.debug("Starting process with command: " + command);
commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8));
for (String arg : command) {
commandStream.write('\t');

View File

@ -50,8 +50,9 @@ public class AutodetectCommunicator implements Closeable {
final AtomicReference<CountDownLatch> inUse = new AtomicReference<>();
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter,
AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) {
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process,
StatusReporter statusReporter, AutoDetectResultProcessor autoDetectResultProcessor,
StateProcessor stateProcessor) {
this.jobId = job.getId();
this.autodetectProcess = process;
this.statusReporter = statusReporter;
@ -63,7 +64,7 @@ public class AutodetectCommunicator implements Closeable {
autoDetectResultProcessor.process(jobId, process.getProcessOutStream(), usePerPartitionNormalization)
);
autoDetectExecutor.execute(() ->
stateProcessor.process(job.getId(), process.getPersistStream())
stateProcessor.process(jobId, process.getPersistStream())
);
this.autoDetectWriter = createProcessWriter(job, process, statusReporter);
}

View File

@ -108,10 +108,10 @@ class NativeAutodetectProcess implements AutodetectProcess {
if (cppLogHandler.seenFatalError()) {
throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
}
LOGGER.info("[{}] Autodetect process exited", jobId);
LOGGER.debug("[{}] Autodetect process exited", jobId);
} catch (ExecutionException | TimeoutException e) {
LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running autodetect process",
new Object[] { jobId }, e));
new Object[] { jobId }), e);
} catch (InterruptedException e) {
LOGGER.warn("[{}] Exception closing the running autodetect process", jobId);
Thread.currentThread().interrupt();

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect.output;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
@ -90,9 +89,9 @@ public class AutoDetectResultProcessor {
} catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", new Object[] {jobId}), e);
} finally {
completionLatch.countDown();
waitUntilRenormalizerIsIdle();
flushListener.clear();
renormalizer.shutdown();
completionLatch.countDown();
}
}
@ -149,12 +148,8 @@ public class AutoDetectResultProcessor {
if (quantiles != null) {
persister.persistQuantiles(quantiles);
LOGGER.debug("[{}] Quantiles parsed from output - will " + "trigger renormalization of scores", context.jobId);
if (context.isPerPartitionNormalization) {
renormalizer.renormalizeWithPartition(quantiles);
} else {
renormalizer.renormalize(quantiles);
}
LOGGER.debug("[{}] Quantiles parsed from output - will trigger renormalization of scores", context.jobId);
renormalizer.renormalize(quantiles);
}
FlushAcknowledgement flushAcknowledgement = result.getFlushAcknowledgement();
if (flushAcknowledgement != null) {

View File

@ -1,25 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
public class NativeNormalizerFactory implements NormalizerFactory {
private final NormalizerProcessFactory processFactory;
private final ExecutorService executorService;
public NativeNormalizerFactory(NormalizerProcessFactory processFactory, ExecutorService executorService) {
this.processFactory = Objects.requireNonNull(processFactory);
this.executorService = Objects.requireNonNull(executorService);
}
@Override
public Normalizer create(String jobId) {
return new Normalizer(jobId, processFactory, executorService);
}
}

View File

@ -72,7 +72,7 @@ class NativeNormalizerProcess implements NormalizerProcess {
if (cppLogHandler.seenFatalError()) {
throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
}
LOGGER.info("[{}] Normalizer process exited", jobId);
LOGGER.debug("[{}] Normalizer process exited", jobId);
} catch (ExecutionException | TimeoutException e) {
LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running normalizer process", new Object[] { jobId }), e);
} catch (InterruptedException e) {

View File

@ -5,16 +5,20 @@
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
/**
* Factory interface for creating implementations of {@link Normalizer}
*/
public interface NormalizerFactory {
/**
* Create an implementation of {@link Normalizer}
*
* @param jobId The job ID
* @return The normalizer
*/
Normalizer create(String jobId);
public class NormalizerFactory {
private final NormalizerProcessFactory processFactory;
private final ExecutorService executorService;
public NormalizerFactory(NormalizerProcessFactory processFactory, ExecutorService executorService) {
this.processFactory = Objects.requireNonNull(processFactory);
this.executorService = Objects.requireNonNull(executorService);
}
public Normalizer create(String jobId) {
return new Normalizer(jobId, processFactory, executorService);
}
}

View File

@ -15,20 +15,7 @@ public interface Renormalizer {
void renormalize(Quantiles quantiles);
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records and aggregate records to the partition
* level
*/
void renormalizeWithPartition(Quantiles quantiles);
/**
* Blocks until the renormalizer is idle and no further normalization tasks are pending.
* Blocks until the renormalizer is idle and no further quantiles updates are pending.
*/
void waitUntilIdle();
/**
* Shut down the renormalizer
*/
boolean shutdown();
}

View File

@ -1,11 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
public interface RenormalizerFactory {
Renormalizer create(String jobId);
}

View File

@ -11,11 +11,10 @@ import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizer;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.job.results.Influencer;
import org.elasticsearch.xpack.prelert.job.results.PerPartitionMaxProbabilities;
import java.util.ArrayList;
import java.util.Deque;
@ -27,7 +26,7 @@ import java.util.stream.Collectors;
* Thread safe class that updates the scores of all existing results
* with the renormalized scores
*/
class ScoresUpdater {
public class ScoresUpdater {
private static final Logger LOGGER = Loggers.getLogger(ScoresUpdater.class);
/**
@ -45,16 +44,17 @@ class ScoresUpdater {
private final Job job;
private final JobProvider jobProvider;
private final JobRenormalizer updatesPersister;
private final JobRenormalizedResultsPersister updatesPersister;
private final NormalizerFactory normalizerFactory;
private int bucketSpan;
private long normalizationWindow;
private boolean perPartitionNormalization;
public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizer jobRenormalizer, NormalizerFactory normalizerFactory) {
public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizedResultsPersister jobRenormalizedResultsPersister,
NormalizerFactory normalizerFactory) {
this.job = job;
this.jobProvider = Objects.requireNonNull(jobProvider);
updatesPersister = Objects.requireNonNull(jobRenormalizer);
updatesPersister = Objects.requireNonNull(jobRenormalizedResultsPersister);
this.normalizerFactory = Objects.requireNonNull(normalizerFactory);
bucketSpan = getBucketSpanOrDefault(job.getAnalysisConfig());
normalizationWindow = getNormalizationWindowOrDefault(job);
@ -181,7 +181,7 @@ class ScoresUpdater {
private void updateBucketIfItHasBigChange(Bucket bucket, int[] counts, boolean perPartitionNormalization) {
if (bucket.hadBigNormalizedUpdate()) {
if (perPartitionNormalization) {
updatesPersister.updatePerPartitionMaxProbabilities(bucket.getJobId(), bucket.getRecords());
updatesPersister.updatePerPartitionMaxProbabilities(job.getId(), bucket.getRecords());
}
updatesPersister.updateBucket(bucket);
@ -204,7 +204,7 @@ class ScoresUpdater {
}
if (!toUpdate.isEmpty()) {
updatesPersister.updateRecords(bucket.getId(), toUpdate);
updatesPersister.updateRecords(job.getId(), toUpdate);
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
/**
* Renormalizer that discards outdated quantiles if even newer ones are received while waiting for a prior renormalization to complete.
*/
public class ShortCircuitingRenormalizer implements Renormalizer {
private static final Logger LOGGER = Loggers.getLogger(ShortCircuitingRenormalizer.class);
private final String jobId;
private final ScoresUpdater scoresUpdater;
private final ExecutorService executorService;
private final boolean isPerPartitionNormalization;
private final Deque<Quantiles> quantilesDeque = new ConcurrentLinkedDeque<>();
/**
* Each job may only have 1 normalization in progress at any time; the semaphore enforces this
*/
private final Semaphore semaphore = new Semaphore(1);
/**
* <code>null</code> means no normalization is in progress
*/
private CountDownLatch completionLatch;
public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService,
boolean isPerPartitionNormalization)
{
this.jobId = jobId;
this.scoresUpdater = scoresUpdater;
this.executorService = executorService;
this.isPerPartitionNormalization = isPerPartitionNormalization;
}
public synchronized void renormalize(Quantiles quantiles)
{
quantilesDeque.addLast(quantiles);
completionLatch = new CountDownLatch(1);
executorService.submit(() -> doRenormalizations());
}
public void waitUntilIdle()
{
try {
CountDownLatch latchToAwait = getCompletionLatch();
while (latchToAwait != null) {
latchToAwait.await();
latchToAwait = getCompletionLatch();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private synchronized CountDownLatch getCompletionLatch() {
return completionLatch;
}
private Quantiles getEarliestQuantiles() {
return quantilesDeque.pollFirst();
}
private Quantiles getLatestQuantilesAndClear() {
// We discard all but the latest quantiles
Quantiles latestQuantiles = null;
for (Quantiles quantiles = quantilesDeque.pollFirst(); quantiles != null; quantiles = quantilesDeque.pollFirst()) {
latestQuantiles = quantiles;
}
return latestQuantiles;
}
private synchronized boolean tryStartWork() {
return semaphore.tryAcquire();
}
private synchronized boolean tryFinishWork() {
if (!quantilesDeque.isEmpty()) {
return false;
}
semaphore.release();
if (completionLatch != null) {
completionLatch.countDown();
completionLatch = null;
}
return true;
}
private synchronized void forceFinishWork() {
semaphore.release();
if (completionLatch != null) {
completionLatch.countDown();
}
}
private void doRenormalizations() {
// Exit immediately if another normalization is in progress. This means we don't hog threads.
if (tryStartWork() == false) {
return;
}
try {
do {
// Note that if there is only one set of quantiles in the queue then both these references will point to the same quantiles.
Quantiles earliestQuantiles = getEarliestQuantiles();
Quantiles latestQuantiles = getLatestQuantilesAndClear();
// We could end up with latestQuantiles being null if the thread running this method was
// preempted before the tryStartWork() call, another thread already running this method
// did the work and exited, and then this thread got true returned by tryStartWork().
if (latestQuantiles != null) {
// We could end up with earliestQuantiles being null if quantiles were
// added between getting the earliest and latest quantiles.
if (earliestQuantiles == null) {
earliestQuantiles = latestQuantiles;
}
long earliestBucketTimeMs = earliestQuantiles.getTimestamp().getTime();
long latestBucketTimeMs = latestQuantiles.getTimestamp().getTime();
// If we're going to skip quantiles, renormalize using the latest quantiles
// over the time ranges implied by all quantiles that were provided.
long windowExtensionMs = latestBucketTimeMs - earliestBucketTimeMs;
if (windowExtensionMs < 0) {
LOGGER.warn("[{}] Quantiles not supplied in order - {} after {}",
jobId, latestBucketTimeMs, earliestBucketTimeMs);
}
scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs,
isPerPartitionNormalization);
}
// Loop if more work has become available while we were working, because the
// tasks originally submitted to do that work will have exited early.
} while (tryFinishWork() == false);
} catch (RuntimeException e) {
forceFinishWork();
throw e;
}
}
}

View File

@ -13,24 +13,12 @@ import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
* This should be removed when the normalizer code is ported
*/
public class NoOpRenormalizer implements Renormalizer {
// NORELEASE Remove once the normalizer code is ported
@Override
public void renormalize(Quantiles quantiles) {
}
@Override
public void renormalizeWithPartition(Quantiles quantiles) {
}
@Override
public void waitUntilIdle() {
}
@Override
public boolean shutdown() {
return true;
}
}

View File

@ -46,32 +46,26 @@ public class Quantiles extends ToXContentToBytes implements Writeable {
PARSER.declareString(ConstructingObjectParser.constructorArg(), QUANTILE_STATE);
}
private String jobId;
private Date timestamp;
private String quantileState;
private final String jobId;
private final Date timestamp;
private final String quantileState;
public Quantiles(String jobId, Date timestamp, String quantilesState) {
public Quantiles(String jobId, Date timestamp, String quantileState) {
this.jobId = jobId;
this.timestamp = timestamp;
quantileState = quantilesState == null ? "" : quantilesState;
this.timestamp = Objects.requireNonNull(timestamp);
this.quantileState = Objects.requireNonNull(quantileState);
}
public Quantiles(StreamInput in) throws IOException {
jobId = in.readString();
if (in.readBoolean()) {
timestamp = new Date(in.readLong());
}
timestamp = new Date(in.readLong());
quantileState = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
boolean hasTimestamp = timestamp != null;
out.writeBoolean(hasTimestamp);
if (hasTimestamp) {
out.writeLong(timestamp.getTime());
}
out.writeLong(timestamp.getTime());
out.writeOptionalString(quantileState);
}

View File

@ -306,6 +306,13 @@ public class Bucket extends ToXContentToBytes implements Writeable {
this.perPartitionMaxProbability = perPartitionMaxProbability;
}
public double partitionInitialAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();
return first.isPresent() ? first.get().getInitialAnomalyScore() : 0.0;
}
public double partitionAnomalyScore(String partitionValue) {
Optional<PartitionScore> first = partitionScores.stream().filter(s -> partitionValue.equals(s.getPartitionFieldValue()))
.findFirst();

View File

@ -19,26 +19,30 @@ import java.util.Objects;
public class PartitionScore extends ToXContentToBytes implements Writeable {
public static final ParseField PARTITION_SCORE = new ParseField("partition_score");
private String partitionFieldValue;
private String partitionFieldName;
private final String partitionFieldValue;
private final String partitionFieldName;
private final double initialAnomalyScore;
private double anomalyScore;
private double probability;
private boolean hadBigNormalizedUpdate;
public static final ConstructingObjectParser<PartitionScore, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
PARTITION_SCORE.getPreferredName(), a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3]));
PARTITION_SCORE.getPreferredName(), a -> new PartitionScore((String) a[0], (String) a[1], (Double) a[2], (Double) a[3],
(Double) a[4]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_NAME);
PARSER.declareString(ConstructingObjectParser.constructorArg(), AnomalyRecord.PARTITION_FIELD_VALUE);
PARSER.declareDouble(ConstructingObjectParser.constructorArg(), Bucket.INITIAL_ANOMALY_SCORE);
PARSER.declareDouble(ConstructingObjectParser.constructorArg(), AnomalyRecord.ANOMALY_SCORE);
PARSER.declareDouble(ConstructingObjectParser.constructorArg(), AnomalyRecord.PROBABILITY);
}
public PartitionScore(String fieldName, String fieldValue, double anomalyScore, double probability) {
public PartitionScore(String fieldName, String fieldValue, double initialAnomalyScore, double anomalyScore, double probability) {
hadBigNormalizedUpdate = false;
partitionFieldName = fieldName;
partitionFieldValue = fieldValue;
this.initialAnomalyScore = initialAnomalyScore;
this.anomalyScore = anomalyScore;
this.probability = probability;
}
@ -46,6 +50,7 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
public PartitionScore(StreamInput in) throws IOException {
partitionFieldName = in.readString();
partitionFieldValue = in.readString();
initialAnomalyScore = in.readDouble();
anomalyScore = in.readDouble();
probability = in.readDouble();
}
@ -54,6 +59,7 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(partitionFieldName);
out.writeString(partitionFieldValue);
out.writeDouble(initialAnomalyScore);
out.writeDouble(anomalyScore);
out.writeDouble(probability);
}
@ -63,12 +69,17 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
builder.startObject();
builder.field(AnomalyRecord.PARTITION_FIELD_NAME.getPreferredName(), partitionFieldName);
builder.field(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue);
builder.field(Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore);
builder.field(AnomalyRecord.ANOMALY_SCORE.getPreferredName(), anomalyScore);
builder.field(AnomalyRecord.PROBABILITY.getPreferredName(), probability);
builder.endObject();
return builder;
}
public double getInitialAnomalyScore() {
return initialAnomalyScore;
}
public double getAnomalyScore() {
return anomalyScore;
}
@ -81,18 +92,10 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
return partitionFieldName;
}
public void setPartitionFieldName(String partitionFieldName) {
this.partitionFieldName = partitionFieldName;
}
public String getPartitionFieldValue() {
return partitionFieldValue;
}
public void setPartitionFieldValue(String partitionFieldValue) {
this.partitionFieldValue = partitionFieldValue;
}
public double getProbability() {
return probability;
}
@ -103,7 +106,7 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
return Objects.hash(partitionFieldName, partitionFieldValue, probability, anomalyScore);
return Objects.hash(partitionFieldName, partitionFieldValue, probability, initialAnomalyScore, anomalyScore);
}
@Override
@ -122,7 +125,7 @@ public class PartitionScore extends ToXContentToBytes implements Writeable {
// as is id, which is generated by the datastore
return Objects.equals(this.partitionFieldValue, that.partitionFieldValue)
&& Objects.equals(this.partitionFieldName, that.partitionFieldName) && (this.probability == that.probability)
&& (this.anomalyScore == that.anomalyScore);
&& (this.initialAnomalyScore == that.initialAnomalyScore) && (this.anomalyScore == that.anomalyScore);
}
public boolean hadBigNormalizedUpdate() {

View File

@ -66,7 +66,7 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase<Get
List<PartitionScore> partitionScores = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
partitionScores.add(new PartitionScore(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20),
randomDouble(), randomDouble()));
randomDouble(), randomDouble(), randomDouble()));
}
bucket.setPartitionScores(partitionScores);
}

View File

@ -57,14 +57,14 @@ import java.util.Set;
public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
private static final String JOB_ID = "foo";
private Renormalizer renormaliser;
private Renormalizer renormalizer;
private JobResultsPersister jobResultsPersister;
private AutodetectResultsParser autodetectResultsParser;
private JobProvider jobProvider;
@Before
private void createComponents() {
renormaliser = new NoOpRenormalizer();
renormalizer = new NoOpRenormalizer();
jobResultsPersister = new JobResultsPersister(nodeSettings(), client());
ParseFieldMatcher matcher = new ParseFieldMatcher(nodeSettings());
autodetectResultsParser = new AutodetectResultsParser(nodeSettings(), () -> matcher);
@ -75,7 +75,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser);
new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
@ -150,7 +150,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser);
new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
@ -192,7 +192,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormaliser, jobResultsPersister, autodetectResultsParser);
new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.xpack.prelert.job.data.DataProcessor;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
@ -34,6 +35,7 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectR
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.prelert.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
import org.junit.Before;
import org.mockito.Mockito;
@ -74,14 +76,18 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private JobManager jobManager;
private JobProvider jobProvider;
private JobResultsPersister jobResultsPersister;
private JobRenormalizedResultsPersister jobRenormalizedResultsPersister;
private JobDataCountsPersister jobDataCountsPersister;
private NormalizerFactory normalizerFactory;
@Before
public void initMocks() {
jobManager = mock(JobManager.class);
jobProvider = mock(JobProvider.class);
jobResultsPersister = mock(JobResultsPersister.class);
jobRenormalizedResultsPersister = mock(JobRenormalizedResultsPersister.class);
jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class);
givenAllocationWithStatus(JobStatus.OPENED);
}
@ -136,8 +142,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
AutodetectProcessManager manager = new AutodetectProcessManager(settings.build(), client, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings);
AutodetectProcessManager manager = new AutodetectProcessManager(settings.build(), client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory, clusterSettings);
manager.openJob("foo", false);
manager.openJob("bar", false);
@ -283,8 +290,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory, clusterSettings);
expectThrows(EsRejectedExecutionException.class, () -> manager.create("_id", false));
verify(autodetectProcess, times(1)).close();
@ -309,8 +317,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingSet.add(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager,
jobProvider, jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, clusterSettings);
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory,
normalizerFactory, clusterSettings);
manager = spy(manager);
doReturn(communicator).when(manager).create(any(), anyBoolean());
return manager;

View File

@ -56,7 +56,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, parser);
processor.process(JOB_ID, mock(InputStream.class), randomBoolean());
verify(renormalizer, times(1)).shutdown();
verify(renormalizer, times(1)).waitUntilIdle();
assertEquals(0, processor.completionLatch.getCount());
}
@ -307,24 +307,4 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormalizer);
}
public void testProcessResult_quantiles_isPerPartitionNormalization() {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, true, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
when(result.getQuantiles()).thenReturn(quantiles);
processor.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
verify(renormalizer, times(1)).renormalizeWithPartition(quantiles);
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormalizer);
}
}

View File

@ -35,7 +35,7 @@ public class AutodetectResultsParserTests extends ESTestCase {
+ "{\"sequence_num\":1,\"timestamp\":1359450000000,\"bucket_span\":22,\"job_id\":\"foo\",\"anomaly_score\":0,"
+ "\"probability\":0.0, \"influencer_field_name\":\"bucket_time\","
+ "\"initial_anomaly_score\":0.0}]}},{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normalizer 1.1, normalizer 2" +
".1]\"}}"
".1]\",\"timestamp\":1359450000000}}"
+ ",{\"bucket\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"bucket_span\":22,\"records\":"
+ "[{\"timestamp\":1359453600000,\"bucket_span\":22,\"sequence_num\":1,\"job_id\":\"foo\",\"probability\":0.0637541,"
+ "\"by_field_name\":\"airline\",\"by_field_value\":\"JZA\", \"typical\":[1020.08],\"actual\":[1042.14],"
@ -57,9 +57,9 @@ public class AutodetectResultsParserTests extends ESTestCase {
+ "\"initial_anomaly_score\":20.22688,\"anomaly_score\":20.22688} ,{\"timestamp\":1359453600000,\"bucket_span\":22,"
+ "\"sequence_num\":6,\"job_id\":\"foo\",\"raw_anomaly_score\":0.005, \"probability\":0.03,"
+ "\"influencer_field_name\":\"foo\",\"initial_anomaly_score\":10.5,\"anomaly_score\":10.5}]}},{\"quantiles\": "
+ "{\"job_id\":\"foo\","
+ "{\"job_id\":\"foo\",\"timestamp\":1359453600000,"
+ "\"quantile_state\":\"[normalizer 1.2, normalizer 2.2]\"}} ,{\"flush\": {\"id\":\"testing1\"}} ,"
+ "{\"quantiles\": {\"job_id\":\"foo\", \"quantile_state\":\"[normalizer 1.3, normalizer 2.3]\"}} ]";
+ "{\"quantiles\": {\"job_id\":\"foo\",\"timestamp\":1359453600000,\"quantile_state\":\"[normalizer 1.3, normalizer 2.3]\"}} ]";
public static final String POPULATION_OUTPUT_SAMPLE = "[{\"timestamp\":1379590200,\"records\":[{\"probability\":1.38951e-08,"
+ "\"field_name\":\"sum_cs_bytes_\",\"over_field_name\":\"cs_host\",\"over_field_value\":\"mail.google.com\","
@ -314,13 +314,13 @@ public class AutodetectResultsParserTests extends ESTestCase {
.collect(Collectors.toList());
assertEquals(3, quantiles.size());
assertEquals("foo", quantiles.get(0).getJobId());
assertNull(quantiles.get(0).getTimestamp());
assertEquals(new Date(1359450000000L), quantiles.get(0).getTimestamp());
assertEquals("[normalizer 1.1, normalizer 2.1]", quantiles.get(0).getQuantileState());
assertEquals("foo", quantiles.get(1).getJobId());
assertNull(quantiles.get(1).getTimestamp());
assertEquals(new Date(1359453600000L), quantiles.get(1).getTimestamp());
assertEquals("[normalizer 1.2, normalizer 2.2]", quantiles.get(1).getQuantileState());
assertEquals("foo", quantiles.get(2).getJobId());
assertNull(quantiles.get(2).getTimestamp());
assertEquals(new Date(1359453600000L), quantiles.get(2).getTimestamp());
assertEquals("[normalizer 1.3, normalizer 2.3]", quantiles.get(2).getQuantileState());
}

View File

@ -48,8 +48,8 @@ public class BucketNormalizableTests extends ESTestCase {
bucket.setRecords(Arrays.asList(record1, record2));
List<PartitionScore> partitionScores = new ArrayList<>();
partitionScores.add(new PartitionScore("pf1", "pv1", 0.2, 0.1));
partitionScores.add(new PartitionScore("pf1", "pv2", 0.4, 0.01));
partitionScores.add(new PartitionScore("pf1", "pv1", 0.3, 0.2, 0.1));
partitionScores.add(new PartitionScore("pf1", "pv2", 0.5, 0.4, 0.01));
bucket.setPartitionScores(partitionScores);
}

View File

@ -47,26 +47,27 @@ public class NormalizerTests extends ESTestCase {
public void testNormalize() throws IOException {
ExecutorService threadpool = Executors.newScheduledThreadPool(1);
try {
NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class);
when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), eq(false),
any())).thenReturn(new MultiplyingNormalizerProcess(Settings.EMPTY, FACTOR));
Normalizer normalizer = new Normalizer(JOB_ID, processFactory, threadpool);
NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class);
when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), eq(false),
any())).thenReturn(new MultiplyingNormalizerProcess(Settings.EMPTY, FACTOR));
Normalizer normalizer = new Normalizer(JOB_ID, processFactory, threadpool);
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(0.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE));
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
Bucket bucket = generateBucket(new Date(0));
bucket.setAnomalyScore(0.0);
bucket.addBucketInfluencer(createTimeBucketInfluencer(bucket.getTimestamp(), 0.07, INITIAL_SCORE));
Deque<Bucket> buckets = new ArrayDeque<>();
buckets.add(bucket);
List<Normalizable> asNormalizables = buckets.stream()
.map(b -> new BucketNormalizable(b)).collect(Collectors.toList());
List<Normalizable> asNormalizables = buckets.stream()
.map(b -> new BucketNormalizable(b)).collect(Collectors.toList());
normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE);
normalizer.normalize(BUCKET_SPAN, false, asNormalizables, QUANTILES_STATE);
threadpool.shutdown();
assertEquals(1, asNormalizables.size());
assertEquals(FACTOR * INITIAL_SCORE, asNormalizables.get(0).getNormalizedScore(), 0.0001);
assertEquals(1, asNormalizables.size());
assertEquals(FACTOR * INITIAL_SCORE, asNormalizables.get(0).getNormalizedScore(), 0.0001);
} finally {
threadpool.shutdown();
}
}
}

View File

@ -28,7 +28,7 @@ import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.persistence.BatchedDocumentsIterator;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizer;
import org.elasticsearch.xpack.prelert.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.MockBatchedDocumentsIterator;
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
@ -45,7 +45,7 @@ public class ScoresUpdaterTests extends ESTestCase {
private static final long DEFAULT_END_TIME = 3600;
private JobProvider jobProvider = mock(JobProvider.class);
private JobRenormalizer jobRenormalizer = mock(JobRenormalizer.class);
private JobRenormalizedResultsPersister jobRenormalizedResultsPersister = mock(JobRenormalizedResultsPersister.class);
private Normalizer normalizer = mock(Normalizer.class);
private NormalizerFactory normalizerFactory = mock(NormalizerFactory.class);
@ -70,7 +70,7 @@ public class ScoresUpdaterTests extends ESTestCase {
job = jobBuilder.build();
scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizer, normalizerFactory);
scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory);
givenProviderReturnsNoBuckets(DEFAULT_START_TIME, DEFAULT_END_TIME);
givenProviderReturnsNoInfluencers(DEFAULT_START_TIME, DEFAULT_END_TIME);
@ -186,7 +186,7 @@ public class ScoresUpdaterTests extends ESTestCase {
verifyNormalizerWasInvoked(1);
verifyBucketWasNotUpdated(bucket);
verifyRecordsWereUpdated(bucket.getId(), Arrays.asList(record1, record3));
verifyRecordsWereUpdated(bucket.getJobId(), Arrays.asList(record1, record3));
}
public void testUpdate_GivenSingleBucketWithBigChangeAndSomeRecordsWithBigChange() throws IOException {
@ -212,7 +212,7 @@ public class ScoresUpdaterTests extends ESTestCase {
verifyNormalizerWasInvoked(1);
verifyBucketWasUpdated(bucket);
verifyRecordsWereUpdated(bucket.getId(), Arrays.asList(record1, record3));
verifyRecordsWereUpdated(bucket.getJobId(), Arrays.asList(record1, record3));
}
public void testUpdate_GivenEnoughBucketsForTwoBatchesButOneNormalization() throws IOException {
@ -409,19 +409,19 @@ public class ScoresUpdaterTests extends ESTestCase {
}
private void verifyBucketWasUpdated(Bucket bucket) {
verify(jobRenormalizer).updateBucket(bucket);
verify(jobRenormalizedResultsPersister).updateBucket(bucket);
}
private void verifyRecordsWereUpdated(String bucketId, List<AnomalyRecord> records) {
verify(jobRenormalizer).updateRecords(bucketId, records);
verify(jobRenormalizedResultsPersister).updateRecords(bucketId, records);
}
private void verifyBucketWasNotUpdated(Bucket bucket) {
verify(jobRenormalizer, never()).updateBucket(bucket);
verify(jobRenormalizedResultsPersister, never()).updateBucket(bucket);
}
private void verifyBucketRecordsWereNotUpdated(String bucketId) {
verify(jobRenormalizer, never()).updateRecords(eq(bucketId),
verify(jobRenormalizedResultsPersister, never()).updateRecords(eq(bucketId),
anyListOf(AnomalyRecord.class));
}
@ -443,6 +443,6 @@ public class ScoresUpdaterTests extends ESTestCase {
private void verifyInfluencerWasUpdated(Influencer influencer) {
List<Influencer> list = new ArrayList<>();
list.add(influencer);
verify(jobRenormalizer).updateInfluencer(eq(JOB_ID), eq(list));
verify(jobRenormalizedResultsPersister).updateInfluencer(eq(JOB_ID), eq(list));
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.process.normalizer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.mockito.ArgumentCaptor;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class ShortCircuitingRenormalizerTests extends ESTestCase {
private static final String JOB_ID = "foo";
// Never reduce this below 4, otherwise some of the logic in the test will break
private static final int TEST_SIZE = 1000;
public void testNormalize() {
ExecutorService threadpool = Executors.newScheduledThreadPool(10);
try {
ScoresUpdater scoresUpdater = mock(ScoresUpdater.class);
boolean isPerPartitionNormalization = randomBoolean();
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, threadpool,
isPerPartitionNormalization);
// Blast through many sets of quantiles in quick succession, faster than the normalizer can process them
for (int i = 1; i < TEST_SIZE / 2; ++i) {
Quantiles quantiles = new Quantiles(JOB_ID, new Date(), Integer.toString(i));
renormalizer.renormalize(quantiles);
}
renormalizer.waitUntilIdle();
for (int i = TEST_SIZE / 2; i <= TEST_SIZE; ++i) {
Quantiles quantiles = new Quantiles(JOB_ID, new Date(), Integer.toString(i));
renormalizer.renormalize(quantiles);
}
renormalizer.waitUntilIdle();
ArgumentCaptor<String> stateCaptor = ArgumentCaptor.forClass(String.class);
verify(scoresUpdater, atLeastOnce()).update(stateCaptor.capture(), anyLong(), anyLong(), eq(isPerPartitionNormalization));
List<String> quantilesUsed = stateCaptor.getAllValues();
assertFalse(quantilesUsed.isEmpty());
assertTrue(quantilesUsed.size() < TEST_SIZE);
// Last quantiles state that was actually used must be the last quantiles state we supplied
assertEquals(Integer.toString(TEST_SIZE), quantilesUsed.get(quantilesUsed.size() - 1));
// Earlier quantiles states that were processed must have been processed in the supplied order
int previous = 0;
for (String state : quantilesUsed) {
int current = Integer.parseInt(state);
assertTrue("Out of sequence states were " + previous + " and " + current + " in " + quantilesUsed, current > previous);
previous = current;
}
// The quantiles immediately before the intermediate wait for idle must have been processed
int intermediateWaitPoint = TEST_SIZE / 2 - 1;
assertTrue(quantilesUsed + " should contain " + intermediateWaitPoint,
quantilesUsed.contains(Integer.toString(intermediateWaitPoint)));
} finally {
threadpool.shutdown();
}
}
}

View File

@ -61,7 +61,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
List<PartitionScore> partitionScores = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
partitionScores.add(new PartitionScore(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomDouble(),
randomDouble()));
randomDouble(), randomDouble()));
}
bucket.setPartitionScores(partitionScores);
}
@ -306,20 +306,28 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
public void testPartitionAnomalyScore() {
List<PartitionScore> pScore = new ArrayList<>();
pScore.add(new PartitionScore("pf", "pv1", 10, 0.1));
pScore.add(new PartitionScore("pf", "pv3", 50, 0.1));
pScore.add(new PartitionScore("pf", "pv4", 60, 0.1));
pScore.add(new PartitionScore("pf", "pv2", 40, 0.1));
pScore.add(new PartitionScore("pf", "pv1", 11.0, 10.0, 0.1));
pScore.add(new PartitionScore("pf", "pv3", 51.0, 50.0, 0.1));
pScore.add(new PartitionScore("pf", "pv4", 61.0, 60.0, 0.1));
pScore.add(new PartitionScore("pf", "pv2", 41.0, 40.0, 0.1));
Bucket bucket = new Bucket("foo", new Date(123), 123);
bucket.setPartitionScores(pScore);
double initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv1");
assertEquals(11.0, initialAnomalyScore, 0.001);
double anomalyScore = bucket.partitionAnomalyScore("pv1");
assertEquals(10.0, anomalyScore, 0.001);
initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv2");
assertEquals(41.0, initialAnomalyScore, 0.001);
anomalyScore = bucket.partitionAnomalyScore("pv2");
assertEquals(40.0, anomalyScore, 0.001);
initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv3");
assertEquals(51.0, initialAnomalyScore, 0.001);
anomalyScore = bucket.partitionAnomalyScore("pv3");
assertEquals(50.0, anomalyScore, 0.001);
initialAnomalyScore = bucket.partitionInitialAnomalyScore("pv4");
assertEquals(61.0, initialAnomalyScore, 0.001);
anomalyScore = bucket.partitionAnomalyScore("pv4");
assertEquals(60.0, anomalyScore, 0.001);
}

View File

@ -14,7 +14,8 @@ public class PartitionScoreTests extends AbstractSerializingTestCase<PartitionSc
@Override
protected PartitionScore createTestInstance() {
return new PartitionScore(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomDouble(), randomDouble());
return new PartitionScore(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20), randomDouble(), randomDouble(),
randomDouble());
}
@Override