diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 8d3fdab1797..b1c2ebb0528 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -261,13 +261,14 @@ public class AutoDetectResultProcessor { } Quantiles quantiles = result.getQuantiles(); if (quantiles != null) { + LOGGER.debug("[{}] Parsed Quantiles with timestamp {}", context.jobId, quantiles.getTimestamp()); persister.persistQuantiles(quantiles); context.bulkResultsPersister.executeRequest(); - if (processKilled == false) { + if (processKilled == false && renormalizer.isEnabled()) { // We need to make all results written up to these quantiles available for renormalization persister.commitResultWrites(context.jobId); - LOGGER.debug("[{}] Quantiles parsed from output - will trigger renormalization of scores", context.jobId); + LOGGER.debug("[{}] Quantiles queued for renormalization", context.jobId); renormalizer.renormalize(quantiles); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java index e94939d2dab..f8d7854c434 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java @@ -8,6 +8,13 @@ package org.elasticsearch.xpack.ml.job.process.normalizer; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; public interface Renormalizer { + + /** + * Is renormalization enabled? + * @return {@code true} if renormalization is enabled or {@code false} otherwise + */ + boolean isEnabled(); + /** * Update the anomaly score field on all previously persisted buckets * and all contained records diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java index bfb1f852d31..7d689046209 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java @@ -202,4 +202,8 @@ public class ScoresUpdater { updatesPersister.updateResults(toUpdate); } } + + long getNormalizationWindow() { + return normalizationWindow; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java index 411ad7c2d4b..b11db903d6d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java @@ -42,8 +42,17 @@ public class ShortCircuitingRenormalizer implements Renormalizer { this.isPerPartitionNormalization = isPerPartitionNormalization; } + @Override + public boolean isEnabled() { + return scoresUpdater.getNormalizationWindow() > 0; + } + @Override public void renormalize(Quantiles quantiles) { + if (!isEnabled()) { + return; + } + // This will throw NPE if quantiles is null, so do it first QuantilesWithLatch quantilesWithLatch = new QuantilesWithLatch(quantiles, new CountDownLatch(1)); // Needed to ensure work is not added while the tryFinishWork() method is running diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/noop/NoOpRenormalizer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/noop/NoOpRenormalizer.java deleted file mode 100644 index 8684e982ab6..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/noop/NoOpRenormalizer.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.process.normalizer.noop; - -import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; -import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; - -/** - * A {@link Renormalizer} implementation that does absolutely nothing - * This should be removed when the normalizer code is ported - */ -public class NoOpRenormalizer implements Renormalizer { - - @Override - public void renormalize(Quantiles quantiles) { - } - - @Override - public void waitUntilIdle() { - } - - @Override - public void shutdown() { - } -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java index 74b619f8e03..91a6ae0d910 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherClientHelper.java @@ -41,6 +41,7 @@ public class WatcherClientHelper { } else { try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) { Map filteredHeaders = watch.status().getHeaders().entrySet().stream() + .filter(Watcher.HEADER_FILTERS::contains) .filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 0cb83c7cb7c..05475e7b9ab 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -36,7 +36,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledge import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; -import org.elasticsearch.xpack.ml.job.process.normalizer.noop.NoOpRenormalizer; +import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.job.results.Bucket; @@ -64,6 +64,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { @@ -72,6 +74,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { private JobProvider jobProvider; private List capturedUpdateModelSnapshotOnJobRequests; private AutoDetectResultProcessor resultProcessor; + private Renormalizer renormalizer; @Override protected Settings nodeSettings() { @@ -94,8 +97,9 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { Settings.Builder builder = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); jobProvider = new JobProvider(client(), builder.build()); + renormalizer = mock(Renormalizer.class); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); - resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, new NoOpRenormalizer(), + resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) { @Override protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) { @@ -170,6 +174,38 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase { assertEquals(quantiles, persistedQuantiles.get()); } + public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception { + when(renormalizer.isEnabled()).thenReturn(true); + + ResultsBuilder builder = new ResultsBuilder(); + Quantiles quantiles = createQuantiles(); + builder.addQuantiles(quantiles); + + resultProcessor.process(builder.buildTestProcess()); + resultProcessor.awaitCompletion(); + + Optional persistedQuantiles = getQuantiles(); + assertTrue(persistedQuantiles.isPresent()); + assertEquals(quantiles, persistedQuantiles.get()); + verify(renormalizer).renormalize(quantiles); + } + + public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception { + when(renormalizer.isEnabled()).thenReturn(false); + + ResultsBuilder builder = new ResultsBuilder(); + Quantiles quantiles = createQuantiles(); + builder.addQuantiles(quantiles); + + resultProcessor.process(builder.buildTestProcess()); + resultProcessor.awaitCompletion(); + + Optional persistedQuantiles = getQuantiles(); + assertTrue(persistedQuantiles.isPresent()); + assertEquals(quantiles, persistedQuantiles.get()); + verify(renormalizer, never()).renormalize(quantiles); + } + public void testDeleteInterimResults() throws Exception { Bucket nonInterimBucket = createBucket(false); Bucket interimBucket = createBucket(true); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java index d631e79c7fb..019535c7cff 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; @@ -37,11 +38,45 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase { cleanUp(); } - public void test() throws Exception { + public void testDefaultRenormalization() throws Exception { + String jobId = "basic-renormalization-it-test-default-renormalization-job"; + createAndRunJob(jobId, null); + + List records = getRecords(jobId); + assertThat(records.size(), equalTo(2)); + AnomalyRecord laterRecord = records.get(0); + assertThat(laterRecord.getActual().get(0), equalTo(100.0)); + AnomalyRecord earlierRecord = records.get(1); + assertThat(earlierRecord.getActual().get(0), equalTo(10.0)); + assertThat(laterRecord.getRecordScore(), greaterThan(earlierRecord.getRecordScore())); + + // This is the key assertion: if renormalization never happened then the record_score would + // be the same as the initial_record_score on the anomaly record that happened earlier + assertThat(earlierRecord.getInitialRecordScore(), greaterThan(earlierRecord.getRecordScore())); + + // Since this job ran for 50 buckets, it's a good place to assert + // that established model memory matches model memory in the job stats + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(jobId).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + } + + public void testRenormalizationDisabled() throws Exception { + String jobId = "basic-renormalization-it-test-renormalization-disabled-job"; + createAndRunJob(jobId, 0L); + + List records = getRecords(jobId); + for (AnomalyRecord record : records) { + assertThat(record.getInitialRecordScore(), equalTo(record.getRecordScore())); + } + } + + private void createAndRunJob(String jobId, Long renormalizationWindow) throws Exception { TimeValue bucketSpan = TimeValue.timeValueHours(1); long startTime = 1491004800000L; - Job.Builder job = buildAndRegisterJob("basic-renormalization-it-job", bucketSpan); + Job.Builder job = buildAndRegisterJob(jobId, bucketSpan, renormalizationWindow); openJob(job.getId()); postData(job.getId(), generateData(startTime, bucketSpan, 50, bucketIndex -> { @@ -56,28 +91,9 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase { } }).stream().collect(Collectors.joining())); closeJob(job.getId()); - - List records = getRecords(job.getId()); - assertThat(records.size(), equalTo(2)); - AnomalyRecord laterRecord = records.get(0); - assertThat(laterRecord.getActual().get(0), equalTo(100.0)); - AnomalyRecord earlierRecord = records.get(1); - assertThat(earlierRecord.getActual().get(0), equalTo(10.0)); - assertThat(laterRecord.getRecordScore(), greaterThan(earlierRecord.getRecordScore())); - - // This is the key assertion: if renormalization never happened then the record_score would - // be the same as the initial_record_score on the anomaly record that happened earlier - assertThat(earlierRecord.getInitialRecordScore(), greaterThan(earlierRecord.getRecordScore())); - - // Since this job ran for 50 buckets, it's a good place to assert - // that established model memory matches model memory in the job stats - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); } - private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception { + private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan, Long renormalizationWindow) throws Exception { Detector.Builder detector = new Detector.Builder("count", null); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build())); analysisConfig.setBucketSpan(bucketSpan); @@ -85,6 +101,9 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase { job.setAnalysisConfig(analysisConfig); DataDescription.Builder dataDescription = new DataDescription.Builder(); job.setDataDescription(dataDescription); + if (renormalizationWindow != null) { + job.setRenormalizationWindowDays(renormalizationWindow); + } registerJob(job); putJob(job); return job; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 804573f11e1..c55da2598a2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -325,7 +325,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verifyNoMoreInteractions(persister); } - public void testProcessResult_quantiles() { + public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); @@ -333,16 +333,36 @@ public class AutoDetectResultProcessorTests extends ESTestCase { AutodetectResult result = mock(AutodetectResult.class); Quantiles quantiles = mock(Quantiles.class); when(result.getQuantiles()).thenReturn(quantiles); + when(renormalizer.isEnabled()).thenReturn(true); processorUnderTest.processResult(context, result); verify(persister, times(1)).persistQuantiles(quantiles); verify(bulkBuilder).executeRequest(); verify(persister).commitResultWrites(JOB_ID); + verify(renormalizer, times(1)).isEnabled(); verify(renormalizer, times(1)).renormalize(quantiles); verifyNoMoreInteractions(persister); verifyNoMoreInteractions(renormalizer); } + public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + context.deleteInterimRequired = false; + AutodetectResult result = mock(AutodetectResult.class); + Quantiles quantiles = mock(Quantiles.class); + when(result.getQuantiles()).thenReturn(quantiles); + when(renormalizer.isEnabled()).thenReturn(false); + processorUnderTest.processResult(context, result); + + verify(persister, times(1)).persistQuantiles(quantiles); + verify(bulkBuilder).executeRequest(); + verify(renormalizer, times(1)).isEnabled(); + verifyNoMoreInteractions(persister); + verifyNoMoreInteractions(renormalizer); + } + public void testAwaitCompletion() throws TimeoutException { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java index 81849486587..cdc2fce6822 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.normalizer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles; +import org.junit.Before; import org.mockito.ArgumentCaptor; import java.util.Date; @@ -15,23 +16,33 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ShortCircuitingRenormalizerTests extends ESTestCase { + private static final String JOB_ID = "foo"; + // Never reduce this below 4, otherwise some of the logic in the test will break private static final int TEST_SIZE = 1000; + private ScoresUpdater scoresUpdater; + + @Before + public void setUpMocks() { + scoresUpdater = mock(ScoresUpdater.class); + when(scoresUpdater.getNormalizationWindow()).thenReturn(30L); + } + public void testNormalize() throws InterruptedException { ExecutorService threadpool = Executors.newScheduledThreadPool(10); try { - ScoresUpdater scoresUpdater = mock(ScoresUpdater.class); - boolean isPerPartitionNormalization = randomBoolean(); ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, threadpool, @@ -76,4 +87,20 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase { } assertTrue(threadpool.awaitTermination(1, TimeUnit.SECONDS)); } + + public void testIsEnabled_GivenNormalizationWindowIsZero() { + ScoresUpdater scoresUpdater = mock(ScoresUpdater.class); + when(scoresUpdater.getNormalizationWindow()).thenReturn(0L); + ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null, randomBoolean()); + + assertThat(renormalizer.isEnabled(), is(false)); + } + + public void testIsEnabled_GivenNormalizationWindowGreaterThanZero() { + ScoresUpdater scoresUpdater = mock(ScoresUpdater.class); + when(scoresUpdater.getNormalizationWindow()).thenReturn(1L); + ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null, randomBoolean()); + + assertThat(renormalizer.isEnabled(), is(true)); + } }