[ML] Avoid all overhead when renormalization window is zero (elastic/x-pack-elasticsearch#3255)

relates elastic/x-pack-elasticsearch#3244

Original commit: elastic/x-pack-elasticsearch@fe41c23ad7
This commit is contained in:
Dimitris Athanasiou 2017-12-08 12:22:52 +00:00 committed by GitHub
parent 3f17c28f9b
commit 434dc94eb2
10 changed files with 153 additions and 57 deletions

View File

@ -261,13 +261,14 @@ public class AutoDetectResultProcessor {
}
Quantiles quantiles = result.getQuantiles();
if (quantiles != null) {
LOGGER.debug("[{}] Parsed Quantiles with timestamp {}", context.jobId, quantiles.getTimestamp());
persister.persistQuantiles(quantiles);
context.bulkResultsPersister.executeRequest();
if (processKilled == false) {
if (processKilled == false && renormalizer.isEnabled()) {
// We need to make all results written up to these quantiles available for renormalization
persister.commitResultWrites(context.jobId);
LOGGER.debug("[{}] Quantiles parsed from output - will trigger renormalization of scores", context.jobId);
LOGGER.debug("[{}] Quantiles queued for renormalization", context.jobId);
renormalizer.renormalize(quantiles);
}
}

View File

@ -8,6 +8,13 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
public interface Renormalizer {
/**
* Is renormalization enabled?
* @return {@code true} if renormalization is enabled or {@code false} otherwise
*/
boolean isEnabled();
/**
* Update the anomaly score field on all previously persisted buckets
* and all contained records

View File

@ -202,4 +202,8 @@ public class ScoresUpdater {
updatesPersister.updateResults(toUpdate);
}
}
long getNormalizationWindow() {
return normalizationWindow;
}
}

View File

@ -42,8 +42,17 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
this.isPerPartitionNormalization = isPerPartitionNormalization;
}
@Override
public boolean isEnabled() {
return scoresUpdater.getNormalizationWindow() > 0;
}
@Override
public void renormalize(Quantiles quantiles) {
if (!isEnabled()) {
return;
}
// This will throw NPE if quantiles is null, so do it first
QuantilesWithLatch quantilesWithLatch = new QuantilesWithLatch(quantiles, new CountDownLatch(1));
// Needed to ensure work is not added while the tryFinishWork() method is running

View File

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process.normalizer.noop;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
/**
* A {@link Renormalizer} implementation that does absolutely nothing
* This should be removed when the normalizer code is ported
*/
public class NoOpRenormalizer implements Renormalizer {
@Override
public void renormalize(Quantiles quantiles) {
}
@Override
public void waitUntilIdle() {
}
@Override
public void shutdown() {
}
}

View File

@ -41,6 +41,7 @@ public class WatcherClientHelper {
} else {
try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) {
Map<String, String> filteredHeaders = watch.status().getHeaders().entrySet().stream()
.filter(Watcher.HEADER_FILTERS::contains)
.filter(e -> Watcher.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
client.threadPool().getThreadContext().copyHeaders(filteredHeaders.entrySet());

View File

@ -36,7 +36,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledge
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.noop.NoOpRenormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.Bucket;
@ -64,6 +64,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
@ -72,6 +74,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
private JobProvider jobProvider;
private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
private AutoDetectResultProcessor resultProcessor;
private Renormalizer renormalizer;
@Override
protected Settings nodeSettings() {
@ -94,8 +97,9 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
Settings.Builder builder = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
jobProvider = new JobProvider(client(), builder.build());
renormalizer = mock(Renormalizer.class);
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, new NoOpRenormalizer(),
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer,
new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) {
@Override
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
@ -170,6 +174,38 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
assertEquals(quantiles, persistedQuantiles.get());
}
public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception {
when(renormalizer.isEnabled()).thenReturn(true);
ResultsBuilder builder = new ResultsBuilder();
Quantiles quantiles = createQuantiles();
builder.addQuantiles(quantiles);
resultProcessor.process(builder.buildTestProcess());
resultProcessor.awaitCompletion();
Optional<Quantiles> persistedQuantiles = getQuantiles();
assertTrue(persistedQuantiles.isPresent());
assertEquals(quantiles, persistedQuantiles.get());
verify(renormalizer).renormalize(quantiles);
}
public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception {
when(renormalizer.isEnabled()).thenReturn(false);
ResultsBuilder builder = new ResultsBuilder();
Quantiles quantiles = createQuantiles();
builder.addQuantiles(quantiles);
resultProcessor.process(builder.buildTestProcess());
resultProcessor.awaitCompletion();
Optional<Quantiles> persistedQuantiles = getQuantiles();
assertTrue(persistedQuantiles.isPresent());
assertEquals(quantiles, persistedQuantiles.get());
verify(renormalizer, never()).renormalize(quantiles);
}
public void testDeleteInterimResults() throws Exception {
Bucket nonInterimBucket = createBucket(false);
Bucket interimBucket = createBucket(true);

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
@ -37,11 +38,45 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase {
cleanUp();
}
public void test() throws Exception {
public void testDefaultRenormalization() throws Exception {
String jobId = "basic-renormalization-it-test-default-renormalization-job";
createAndRunJob(jobId, null);
List<AnomalyRecord> records = getRecords(jobId);
assertThat(records.size(), equalTo(2));
AnomalyRecord laterRecord = records.get(0);
assertThat(laterRecord.getActual().get(0), equalTo(100.0));
AnomalyRecord earlierRecord = records.get(1);
assertThat(earlierRecord.getActual().get(0), equalTo(10.0));
assertThat(laterRecord.getRecordScore(), greaterThan(earlierRecord.getRecordScore()));
// This is the key assertion: if renormalization never happened then the record_score would
// be the same as the initial_record_score on the anomaly record that happened earlier
assertThat(earlierRecord.getInitialRecordScore(), greaterThan(earlierRecord.getRecordScore()));
// Since this job ran for 50 buckets, it's a good place to assert
// that established model memory matches model memory in the job stats
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0);
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
Job updatedJob = getJob(jobId).get(0);
assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
}
public void testRenormalizationDisabled() throws Exception {
String jobId = "basic-renormalization-it-test-renormalization-disabled-job";
createAndRunJob(jobId, 0L);
List<AnomalyRecord> records = getRecords(jobId);
for (AnomalyRecord record : records) {
assertThat(record.getInitialRecordScore(), equalTo(record.getRecordScore()));
}
}
private void createAndRunJob(String jobId, Long renormalizationWindow) throws Exception {
TimeValue bucketSpan = TimeValue.timeValueHours(1);
long startTime = 1491004800000L;
Job.Builder job = buildAndRegisterJob("basic-renormalization-it-job", bucketSpan);
Job.Builder job = buildAndRegisterJob(jobId, bucketSpan, renormalizationWindow);
openJob(job.getId());
postData(job.getId(), generateData(startTime, bucketSpan, 50,
bucketIndex -> {
@ -56,28 +91,9 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase {
}
}).stream().collect(Collectors.joining()));
closeJob(job.getId());
List<AnomalyRecord> records = getRecords(job.getId());
assertThat(records.size(), equalTo(2));
AnomalyRecord laterRecord = records.get(0);
assertThat(laterRecord.getActual().get(0), equalTo(100.0));
AnomalyRecord earlierRecord = records.get(1);
assertThat(earlierRecord.getActual().get(0), equalTo(10.0));
assertThat(laterRecord.getRecordScore(), greaterThan(earlierRecord.getRecordScore()));
// This is the key assertion: if renormalization never happened then the record_score would
// be the same as the initial_record_score on the anomaly record that happened earlier
assertThat(earlierRecord.getInitialRecordScore(), greaterThan(earlierRecord.getRecordScore()));
// Since this job ran for 50 buckets, it's a good place to assert
// that established model memory matches model memory in the job stats
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
Job updatedJob = getJob(job.getId()).get(0);
assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
}
private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception {
private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan, Long renormalizationWindow) throws Exception {
Detector.Builder detector = new Detector.Builder("count", null);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
@ -85,6 +101,9 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase {
job.setAnalysisConfig(analysisConfig);
DataDescription.Builder dataDescription = new DataDescription.Builder();
job.setDataDescription(dataDescription);
if (renormalizationWindow != null) {
job.setRenormalizationWindowDays(renormalizationWindow);
}
registerJob(job);
putJob(job);
return job;

View File

@ -325,7 +325,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verifyNoMoreInteractions(persister);
}
public void testProcessResult_quantiles() {
public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
@ -333,16 +333,36 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
when(result.getQuantiles()).thenReturn(quantiles);
when(renormalizer.isEnabled()).thenReturn(true);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
verify(bulkBuilder).executeRequest();
verify(persister).commitResultWrites(JOB_ID);
verify(renormalizer, times(1)).isEnabled();
verify(renormalizer, times(1)).renormalize(quantiles);
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormalizer);
}
public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
context.deleteInterimRequired = false;
AutodetectResult result = mock(AutodetectResult.class);
Quantiles quantiles = mock(Quantiles.class);
when(result.getQuantiles()).thenReturn(quantiles);
when(renormalizer.isEnabled()).thenReturn(false);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistQuantiles(quantiles);
verify(bulkBuilder).executeRequest();
verify(renormalizer, times(1)).isEnabled();
verifyNoMoreInteractions(persister);
verifyNoMoreInteractions(renormalizer);
}
public void testAwaitCompletion() throws TimeoutException {
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.Date;
@ -15,23 +16,33 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ShortCircuitingRenormalizerTests extends ESTestCase {
private static final String JOB_ID = "foo";
// Never reduce this below 4, otherwise some of the logic in the test will break
private static final int TEST_SIZE = 1000;
private ScoresUpdater scoresUpdater;
@Before
public void setUpMocks() {
scoresUpdater = mock(ScoresUpdater.class);
when(scoresUpdater.getNormalizationWindow()).thenReturn(30L);
}
public void testNormalize() throws InterruptedException {
ExecutorService threadpool = Executors.newScheduledThreadPool(10);
try {
ScoresUpdater scoresUpdater = mock(ScoresUpdater.class);
boolean isPerPartitionNormalization = randomBoolean();
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, threadpool,
@ -76,4 +87,20 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase {
}
assertTrue(threadpool.awaitTermination(1, TimeUnit.SECONDS));
}
public void testIsEnabled_GivenNormalizationWindowIsZero() {
ScoresUpdater scoresUpdater = mock(ScoresUpdater.class);
when(scoresUpdater.getNormalizationWindow()).thenReturn(0L);
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null, randomBoolean());
assertThat(renormalizer.isEnabled(), is(false));
}
public void testIsEnabled_GivenNormalizationWindowGreaterThanZero() {
ScoresUpdater scoresUpdater = mock(ScoresUpdater.class);
when(scoresUpdater.getNormalizationWindow()).thenReturn(1L);
ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(JOB_ID, scoresUpdater, null, randomBoolean());
assertThat(renormalizer.isEnabled(), is(true));
}
}