diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index cb194556430..ac0d95115c8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -390,6 +391,21 @@ public class JobResultsPersister { } } + /** + * Makes annotations searchable as they are considered part of a job's results + * to fulfil the contract that job results are searchable immediately after a + * close or flush. + */ + public void commitAnnotationWrites() { + // We refresh using the read alias in order to ensure all indices will + // be refreshed even if a rollover occurs in between. + RefreshRequest refreshRequest = new RefreshRequest(AnnotationIndex.READ_ALIAS_NAME); + refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { + client.admin().indices().refresh(refreshRequest).actionGet(); + } + } + /** * Once the job state has been written calling this function makes it * immediately searchable. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 25a95dc9907..1d6b8106471 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -346,6 +346,7 @@ public class AutodetectResultProcessor { bulkResultsPersister.executeRequest(); bulkAnnotationsPersister.executeRequest(); persister.commitResultWrites(jobId); + persister.commitAnnotationWrites(); LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId()); } catch (Exception e) { LOGGER.error( @@ -470,6 +471,7 @@ public class AutodetectResultProcessor { // These lines ensure that the "completion" we're awaiting includes making the results searchable waitUntilRenormalizerIsIdle(); persister.commitResultWrites(jobId); + persister.commitAnnotationWrites(); persister.commitStateWrites(jobId); } catch (InterruptedException e) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index 5f028310aa0..86df55e13b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -56,6 +56,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -138,7 +139,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { public void testProcess() throws TimeoutException { AutodetectResult autodetectResult = mock(AutodetectResult.class); - when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator()); + when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); processorUnderTest.process(); processorUnderTest.awaitCompletion(); @@ -147,6 +148,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(renormalizer).waitUntilIdle(); verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); + verify(persister).commitAnnotationWrites(); verify(persister).commitStateWrites(JOB_ID); } @@ -243,6 +245,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); verify(persister).commitResultWrites(JOB_ID); + verify(persister).commitAnnotationWrites(); verify(bulkResultsPersister).executeRequest(); } @@ -264,6 +267,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any()); inOrder.verify(bulkResultsPersister).executeRequest(); inOrder.verify(persister).commitResultWrites(JOB_ID); + inOrder.verify(persister).commitAnnotationWrites(); inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); } @@ -453,7 +457,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { public void testAwaitCompletion() throws TimeoutException { AutodetectResult autodetectResult = mock(AutodetectResult.class); - when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator()); + when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); processorUnderTest.process(); processorUnderTest.awaitCompletion(); @@ -462,6 +466,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); + verify(persister).commitAnnotationWrites(); verify(persister).commitStateWrites(JOB_ID); verify(renormalizer).waitUntilIdle(); } @@ -503,7 +508,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { public void testKill() throws TimeoutException { AutodetectResult autodetectResult = mock(AutodetectResult.class); - when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator()); + when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); processorUnderTest.setProcessKilled(); processorUnderTest.process(); @@ -513,6 +518,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); + verify(persister).commitAnnotationWrites(); verify(persister).commitStateWrites(JOB_ID); verify(renormalizer, never()).renormalize(any()); verify(renormalizer).shutdown();