From 54d4f2a62393536e3ce19fe517822aedd285819f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 11 Jun 2020 12:27:44 +0100 Subject: [PATCH] [ML] Refresh annotations index on job flush and close (#57979) Now that annotations are part of the anomaly detection job results the annotations index should be refreshed on flushing and closing the job so that flush and close continue to fulfil their contracts that immediately after returning all results the job generated up to that point are searchable. --- .../ml/job/persistence/JobResultsPersister.java | 16 ++++++++++++++++ .../output/AutodetectResultProcessor.java | 2 ++ .../output/AutodetectResultProcessorTests.java | 12 +++++++++--- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index cb194556430..ac0d95115c8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -390,6 +391,21 @@ public class JobResultsPersister { } } + /** + * Makes annotations searchable as they are considered part of a job's results + * to fulfil the contract that job results are searchable immediately after a + * close or flush. + */ + public void commitAnnotationWrites() { + // We refresh using the read alias in order to ensure all indices will + // be refreshed even if a rollover occurs in between. + RefreshRequest refreshRequest = new RefreshRequest(AnnotationIndex.READ_ALIAS_NAME); + refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { + client.admin().indices().refresh(refreshRequest).actionGet(); + } + } + /** * Once the job state has been written calling this function makes it * immediately searchable. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 25a95dc9907..1d6b8106471 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -346,6 +346,7 @@ public class AutodetectResultProcessor { bulkResultsPersister.executeRequest(); bulkAnnotationsPersister.executeRequest(); persister.commitResultWrites(jobId); + persister.commitAnnotationWrites(); LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId()); } catch (Exception e) { LOGGER.error( @@ -470,6 +471,7 @@ public class AutodetectResultProcessor { // These lines ensure that the "completion" we're awaiting includes making the results searchable waitUntilRenormalizerIsIdle(); persister.commitResultWrites(jobId); + persister.commitAnnotationWrites(); persister.commitStateWrites(jobId); } catch (InterruptedException e) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index 5f028310aa0..86df55e13b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -56,6 +56,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -138,7 +139,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { public void testProcess() throws TimeoutException { AutodetectResult autodetectResult = mock(AutodetectResult.class); - when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator()); + when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); processorUnderTest.process(); processorUnderTest.awaitCompletion(); @@ -147,6 +148,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(renormalizer).waitUntilIdle(); verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); + verify(persister).commitAnnotationWrites(); verify(persister).commitStateWrites(JOB_ID); } @@ -243,6 +245,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); verify(persister).commitResultWrites(JOB_ID); + verify(persister).commitAnnotationWrites(); verify(bulkResultsPersister).executeRequest(); } @@ -264,6 +267,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any()); inOrder.verify(bulkResultsPersister).executeRequest(); inOrder.verify(persister).commitResultWrites(JOB_ID); + inOrder.verify(persister).commitAnnotationWrites(); inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); } @@ -453,7 +457,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { public void testAwaitCompletion() throws TimeoutException { AutodetectResult autodetectResult = mock(AutodetectResult.class); - when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator()); + when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); processorUnderTest.process(); processorUnderTest.awaitCompletion(); @@ -462,6 +466,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); + verify(persister).commitAnnotationWrites(); verify(persister).commitStateWrites(JOB_ID); verify(renormalizer).waitUntilIdle(); } @@ -503,7 +508,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { public void testKill() throws TimeoutException { AutodetectResult autodetectResult = mock(AutodetectResult.class); - when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator()); + when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); processorUnderTest.setProcessKilled(); processorUnderTest.process(); @@ -513,6 +518,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); + verify(persister).commitAnnotationWrites(); verify(persister).commitStateWrites(JOB_ID); verify(renormalizer, never()).renormalize(any()); verify(renormalizer).shutdown();