[ML] Refresh annotations index on job flush and close (#57979)

Now that annotations are part of the anomaly detection job results
the annotations index should be refreshed on flushing and closing
the job so that flush and close continue to fulfil their contracts
that immediately after returning all results the job generated up
to that point are searchable.
This commit is contained in:
David Roberts 2020-06-11 12:27:44 +01:00
parent b87b147704
commit 54d4f2a623
3 changed files with 27 additions and 3 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
@ -390,6 +391,21 @@ public class JobResultsPersister {
}
}
/**
* Makes annotations searchable as they are considered part of a job's results
* to fulfil the contract that job results are searchable immediately after a
* close or flush.
*/
public void commitAnnotationWrites() {
// We refresh using the read alias in order to ensure all indices will
// be refreshed even if a rollover occurs in between.
RefreshRequest refreshRequest = new RefreshRequest(AnnotationIndex.READ_ALIAS_NAME);
refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
client.admin().indices().refresh(refreshRequest).actionGet();
}
}
/**
* Once the job state has been written calling this function makes it
* immediately searchable.

View File

@ -346,6 +346,7 @@ public class AutodetectResultProcessor {
bulkResultsPersister.executeRequest();
bulkAnnotationsPersister.executeRequest();
persister.commitResultWrites(jobId);
persister.commitAnnotationWrites();
LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId());
} catch (Exception e) {
LOGGER.error(
@ -470,6 +471,7 @@ public class AutodetectResultProcessor {
// These lines ensure that the "completion" we're awaiting includes making the results searchable
waitUntilRenormalizerIsIdle();
persister.commitResultWrites(jobId);
persister.commitAnnotationWrites();
persister.commitStateWrites(jobId);
} catch (InterruptedException e) {

View File

@ -56,6 +56,7 @@ import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -138,7 +139,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
public void testProcess() throws TimeoutException {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());
processorUnderTest.process();
processorUnderTest.awaitCompletion();
@ -147,6 +148,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(renormalizer).waitUntilIdle();
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitAnnotationWrites();
verify(persister).commitStateWrites(JOB_ID);
}
@ -243,6 +245,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitAnnotationWrites();
verify(bulkResultsPersister).executeRequest();
}
@ -264,6 +267,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
inOrder.verify(bulkResultsPersister).executeRequest();
inOrder.verify(persister).commitResultWrites(JOB_ID);
inOrder.verify(persister).commitAnnotationWrites();
inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
}
@ -453,7 +457,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
public void testAwaitCompletion() throws TimeoutException {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());
processorUnderTest.process();
processorUnderTest.awaitCompletion();
@ -462,6 +466,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitAnnotationWrites();
verify(persister).commitStateWrites(JOB_ID);
verify(renormalizer).waitUntilIdle();
}
@ -503,7 +508,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
public void testKill() throws TimeoutException {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());
processorUnderTest.setProcessKilled();
processorUnderTest.process();
@ -513,6 +518,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitAnnotationWrites();
verify(persister).commitStateWrites(JOB_ID);
verify(renormalizer, never()).renormalize(any());
verify(renormalizer).shutdown();