From 72ad9a454899bf7d7ec95b370a7bbe96eff293bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Mon, 1 Jun 2020 12:05:09 +0200 Subject: [PATCH] [7.x] Make AnnotationPersister use bulk requests instead of indexing individual documents (#57278) (#57354) --- .../ml/annotations/AnnotationPersister.java | 67 ----- .../annotations/AnnotationPersisterTests.java | 119 --------- .../core/ml/annotations/AnnotationTests.java | 6 +- .../AutodetectResultProcessorIT.java | 4 +- .../ml/integration/EstablishedMemUsageIT.java | 2 +- .../integration/JobStorageDeletionTaskIT.java | 2 +- .../xpack/ml/MachineLearning.java | 5 +- .../ml/annotations/AnnotationPersister.java | 122 +++++++++ .../xpack/ml/datafeed/DatafeedJob.java | 14 +- .../xpack/ml/datafeed/DatafeedJobBuilder.java | 2 +- .../job/persistence/JobResultsPersister.java | 20 +- .../autodetect/AutodetectProcessManager.java | 2 +- .../output/AutodetectResultProcessor.java | 8 +- .../annotations/AnnotationPersisterTests.java | 240 ++++++++++++++++++ .../ml/datafeed/DatafeedJobBuilderTests.java | 2 +- .../xpack/ml/datafeed/DatafeedJobTests.java | 141 ++++++---- .../persistence/JobResultsPersisterTests.java | 12 +- .../AutodetectProcessManagerTests.java | 6 +- .../AutodetectResultProcessorTests.java | 48 ++-- .../ResultsPersisterServiceTests.java | 2 +- 20 files changed, 519 insertions(+), 305 deletions(-) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersisterTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java deleted file mode 100644 index 0aca3ba1f03..00000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersister.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.annotations; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; - -import java.io.IOException; -import java.util.Objects; - -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; - -/** - * Persists annotations to Elasticsearch index. - */ -public class AnnotationPersister { - - private static final Logger logger = LogManager.getLogger(AnnotationPersister.class); - - private final Client client; - private final AbstractAuditor auditor; - - public AnnotationPersister(Client client, AbstractAuditor auditor) { - this.client = Objects.requireNonNull(client); - this.auditor = Objects.requireNonNull(auditor); - } - - /** - * Persists the given annotation to annotations index. - * - * @param annotationId existing annotation id. If {@code null}, a new annotation will be created and id will be assigned automatically - * @param annotation annotation to be persisted - * @param errorMessage error message to report when annotation fails to be persisted - * @return tuple of the form (annotation id, annotation object) - */ - public Tuple persistAnnotation(@Nullable String annotationId, Annotation annotation, String errorMessage) { - Objects.requireNonNull(annotation); - try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { - IndexRequest indexRequest = - new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME) - .id(annotationId) - .source(xContentBuilder); - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { - IndexResponse response = client.index(indexRequest).actionGet(); - return Tuple.tuple(response.getId(), annotation); - } - } catch (IOException ex) { - String jobId = annotation.getJobId(); - logger.error(errorMessage, ex); - auditor.error(jobId, errorMessage); - return null; - } - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java deleted file mode 100644 index 54d86a3bab5..00000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationPersisterTests.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.annotations; - -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; -import org.junit.Before; -import org.mockito.ArgumentCaptor; -import org.mockito.InOrder; - -import java.io.IOException; - -import static org.elasticsearch.common.collect.Tuple.tuple; -import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -public class AnnotationPersisterTests extends ESTestCase { - - private static final String ANNOTATION_ID = "existing_annotation_id"; - private static final String ERROR_MESSAGE = "an error occurred while persisting annotation"; - - private Client client; - private AbstractAuditor auditor; - private IndexResponse indexResponse; - - private ArgumentCaptor indexRequestCaptor; - - @Before - public void setUpMocks() { - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.getThreadContext()).thenReturn(threadContext); - client = mock(Client.class); - when(client.threadPool()).thenReturn(threadPool); - - auditor = mock(AbstractAuditor.class); - - indexResponse = mock(IndexResponse.class); - when(indexResponse.getId()).thenReturn(ANNOTATION_ID); - - indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); - } - - public void testPersistAnnotation_Create() throws IOException { - doReturn(instantFuture(indexResponse)).when(client).index(any()); - - AnnotationPersister persister = new AnnotationPersister(client, auditor); - Annotation annotation = AnnotationTests.randomAnnotation(); - Tuple result = persister.persistAnnotation(null, annotation, ERROR_MESSAGE); - assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation)))); - - InOrder inOrder = inOrder(client); - inOrder.verify(client).threadPool(); - inOrder.verify(client).index(indexRequestCaptor.capture()); - verifyNoMoreInteractions(client, auditor); - - IndexRequest indexRequest = indexRequestCaptor.getValue(); - - assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME))); - assertThat(indexRequest.id(), is(nullValue())); - assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation))); - assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); - } - - public void testPersistAnnotation_Update() throws IOException { - doReturn(instantFuture(indexResponse)).when(client).index(any()); - - AnnotationPersister persister = new AnnotationPersister(client, auditor); - Annotation annotation = AnnotationTests.randomAnnotation(); - Tuple result = persister.persistAnnotation(ANNOTATION_ID, annotation, ERROR_MESSAGE); - assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation)))); - - InOrder inOrder = inOrder(client); - inOrder.verify(client).threadPool(); - inOrder.verify(client).index(indexRequestCaptor.capture()); - verifyNoMoreInteractions(client, auditor); - - IndexRequest indexRequest = indexRequestCaptor.getValue(); - assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME))); - assertThat(indexRequest.id(), is(equalTo(ANNOTATION_ID))); - assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation))); - assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); - } - - @SuppressWarnings("unchecked") - private static ActionFuture instantFuture(T response) { - ActionFuture future = mock(ActionFuture.class); - when(future.actionGet()).thenReturn(response); - return future; - } - - private Annotation parseAnnotation(BytesReference source) throws IOException { - try (XContentParser parser = createParser(jsonXContent, source)) { - return Annotation.PARSER.parse(parser, null).build(); - } - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java index ea6c6de00c2..98e40e95259 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationTests.java @@ -22,17 +22,17 @@ public class AnnotationTests extends AbstractSerializingTestCase { @Override protected Annotation createTestInstance() { - return randomAnnotation(); + return randomAnnotation(randomBoolean() ? randomAlphaOfLengthBetween(10, 30) : null); } - static Annotation randomAnnotation() { + public static Annotation randomAnnotation(String jobId) { return new Annotation.Builder() .setAnnotation(randomAlphaOfLengthBetween(100, 1000)) .setCreateTime(new Date(randomNonNegativeLong())) .setCreateUsername(randomAlphaOfLengthBetween(5, 20)) .setTimestamp(new Date(randomNonNegativeLong())) .setEndTimestamp(randomBoolean() ? new Date(randomNonNegativeLong()) : null) - .setJobId(randomBoolean() ? randomAlphaOfLengthBetween(10, 30) : null) + .setJobId(jobId) .setModifiedTime(randomBoolean() ? new Date(randomNonNegativeLong()) : null) .setModifiedUsername(randomBoolean() ? randomAlphaOfLengthBetween(5, 20) : null) .setType(randomAlphaOfLengthBetween(10, 15)) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index c4b01a0f31e..5a535b9835a 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -36,7 +36,6 @@ import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -55,6 +54,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.ml.LocalStateMachineLearning; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; @@ -149,7 +149,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { JOB_ID, renormalizer, new JobResultsPersister(originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")), - new AnnotationPersister(originSettingClient, auditor), + new AnnotationPersister(resultsPersisterService, auditor), process, new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) { diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java index 5355d65ecfa..6807439464e 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java @@ -252,7 +252,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase { } private void createBuckets(String jobId, int count) { - JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId, () -> true); + JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId); for (int i = 1; i <= count; ++i) { Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan); builder.persistBucket(bucket); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java index fa3db2956d9..9d209902ab3 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobStorageDeletionTaskIT.java @@ -200,7 +200,7 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase { } private void createBuckets(String jobId, int from, int count) { - JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId, () -> true); + JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId); for (int i = from; i <= count + from; ++i) { Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan); builder.persistBucket(bucket); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b346574d354..bd037714991 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -127,7 +127,6 @@ import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; import org.elasticsearch.xpack.core.ml.dataframe.evaluation.MlEvaluationNamedXContentProvider; import org.elasticsearch.xpack.core.ml.inference.MlInferenceNamedXContentProvider; @@ -200,6 +199,7 @@ import org.elasticsearch.xpack.ml.action.TransportUpdateModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportUpdateProcessAction; import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction; import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -548,12 +548,13 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); - AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(client, anomalyDetectionAuditor); DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName()); InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName()); this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor); OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.ML_ORIGIN); ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings); + AnnotationPersister anomalyDetectionAnnotationPersister = + new AnnotationPersister(resultsPersisterService, anomalyDetectionAuditor); JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings, indexNameExpressionResolver); JobResultsPersister jobResultsPersister = new JobResultsPersister(originSettingClient, resultsPersisterService, anomalyDetectionAuditor); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java new file mode 100644 index 00000000000..3744e3e1a5b --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.annotations; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Persists annotations to Elasticsearch index. + */ +public class AnnotationPersister { + + private static final Logger logger = LogManager.getLogger(AnnotationPersister.class); + + private static final int DEFAULT_BULK_LIMIT = 10_000; + + private final ResultsPersisterService resultsPersisterService; + private final AbstractAuditor auditor; + /** + * Execute bulk requests when they reach this size + */ + private final int bulkLimit; + + public AnnotationPersister(ResultsPersisterService resultsPersisterService, AbstractAuditor auditor) { + this(resultsPersisterService, auditor, DEFAULT_BULK_LIMIT); + } + + // For testing + AnnotationPersister(ResultsPersisterService resultsPersisterService, AbstractAuditor auditor, int bulkLimit) { + this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService); + this.auditor = Objects.requireNonNull(auditor); + this.bulkLimit = bulkLimit; + } + + /** + * Persists the given annotation to annotations index. + * + * @param annotationId existing annotation id. If {@code null}, a new annotation will be created and id will be assigned automatically + * @param annotation annotation to be persisted + * @return tuple of the form (annotation id, annotation object) + */ + public Tuple persistAnnotation(@Nullable String annotationId, Annotation annotation) { + Objects.requireNonNull(annotation); + String jobId = annotation.getJobId(); + BulkResponse bulkResponse = bulkPersisterBuilder(jobId).persistAnnotation(annotationId, annotation).executeRequest(); + assert bulkResponse.getItems().length == 1; + return Tuple.tuple(bulkResponse.getItems()[0].getId(), annotation); + } + + public Builder bulkPersisterBuilder(String jobId) { + return new Builder(jobId); + } + + public class Builder { + + private final String jobId; + private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); + private Supplier shouldRetry = () -> true; + + private Builder(String jobId) { + this.jobId = Objects.requireNonNull(jobId); + } + + public Builder shouldRetry(Supplier shouldRetry) { + this.shouldRetry = Objects.requireNonNull(shouldRetry); + return this; + } + + public Builder persistAnnotation(Annotation annotation) { + return persistAnnotation(null, annotation); + } + + public Builder persistAnnotation(@Nullable String annotationId, Annotation annotation) { + Objects.requireNonNull(annotation); + try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { + bulkRequest.add(new IndexRequest().id(annotationId).source(xContentBuilder)); + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Error serialising annotation", jobId), e); + } + + if (bulkRequest.numberOfActions() >= bulkLimit) { + executeRequest(); + } + return this; + } + + /** + * Execute the bulk action + */ + public BulkResponse executeRequest() { + if (bulkRequest.numberOfActions() == 0) { + return null; + } + logger.trace("[{}] ES API CALL: bulk request with {} actions", () -> jobId, () -> bulkRequest.numberOfActions()); + BulkResponse bulkResponse = + resultsPersisterService.bulkIndexWithRetry( + bulkRequest, jobId, shouldRetry, msg -> auditor.warning(jobId, "Bulk indexing of annotations failed " + msg)); + bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); + return bulkResponse; + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 7c789a86231..22ebfda9dab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -21,13 +21,13 @@ import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.security.user.XPackUser; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -223,19 +223,11 @@ class DatafeedJob { auditor.warning(jobId, msg); if (lastDataCheckAnnotationWithId == null) { - lastDataCheckAnnotationWithId = - annotationPersister.persistAnnotation( - null, - annotation, - "[" + jobId + "] failed to create annotation for delayed data checker."); + lastDataCheckAnnotationWithId = annotationPersister.persistAnnotation(null, annotation); } else { String annotationId = lastDataCheckAnnotationWithId.v1(); Annotation updatedAnnotation = updateAnnotation(annotation); - lastDataCheckAnnotationWithId = - annotationPersister.persistAnnotation( - annotationId, - updatedAnnotation, - "[" + jobId + "] failed to update annotation for delayed data checker."); + lastDataCheckAnnotationWithId = annotationPersister.persistAnnotation(annotationId, updatedAnnotation); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index ec3270bf472..6fb1c80a086 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -16,7 +16,6 @@ import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.node.Node; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.action.util.QueryPage; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; @@ -27,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 04c2c850ba2..cb194556430 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -89,23 +89,25 @@ public class JobResultsPersister { this.auditor = auditor; } - public Builder bulkPersisterBuilder(String jobId, Supplier shouldRetry) { - return new Builder(jobId, resultsPersisterService, shouldRetry); + public Builder bulkPersisterBuilder(String jobId) { + return new Builder(jobId); } public class Builder { private BulkRequest bulkRequest; private final String jobId; private final String indexName; - private final Supplier shouldRetry; - private final ResultsPersisterService resultsPersisterService; + private Supplier shouldRetry = () -> true; - private Builder(String jobId, ResultsPersisterService resultsPersisterService, Supplier shouldRetry) { + private Builder(String jobId) { + this.bulkRequest = new BulkRequest(); this.jobId = Objects.requireNonNull(jobId); - indexName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); - bulkRequest = new BulkRequest(); - this.shouldRetry = shouldRetry; - this.resultsPersisterService = resultsPersisterService; + this.indexName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); + } + + public Builder shouldRetry(Supplier shouldRetry) { + this.shouldRetry = Objects.requireNonNull(shouldRetry); + return this; } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index c428bcb397c..27c5aab1849 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -34,7 +34,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -50,6 +49,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index 769e30ed739..f919ffa37f4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -21,7 +21,7 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; @@ -135,7 +135,7 @@ public class AutodetectResultProcessor { this.process = Objects.requireNonNull(autodetectProcess); this.flushListener = Objects.requireNonNull(flushListener); this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); - this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive); + this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive); this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister); this.clock = Objects.requireNonNull(clock); this.deleteInterimRequired = true; @@ -314,9 +314,7 @@ public class AutodetectResultProcessor { updateModelSnapshotOnJob(modelSnapshot); } annotationPersister.persistAnnotation( - ModelSnapshot.annotationDocumentId(modelSnapshot), - createModelSnapshotAnnotation(modelSnapshot), - "[" + jobId + "] failed to create annotation for model snapshot."); + ModelSnapshot.annotationDocumentId(modelSnapshot), createModelSnapshotAnnotation(modelSnapshot)); } Quantiles quantiles = result.getQuantiles(); if (quantiles != null) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersisterTests.java new file mode 100644 index 00000000000..d8524f429b3 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersisterTests.java @@ -0,0 +1,240 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.annotations; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; +import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationTests; +import org.elasticsearch.xpack.ml.test.MockOriginSettingClient; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterServiceTests; +import org.junit.After; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.common.collect.Tuple.tuple; +import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class AnnotationPersisterTests extends ESTestCase { + + private static final String ANNOTATION_ID = "existing_annotation_id"; + private static final String JOB_ID = "job_id"; + + private Client client; + private OriginSettingClient originSettingClient; + private ResultsPersisterService resultsPersisterService; + private AbstractAuditor auditor; + + private ArgumentCaptor bulkRequestCaptor; + + @Before + public void setUpMocks() { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(threadContext); + client = mock(Client.class); + when(client.threadPool()).thenReturn(threadPool); + + originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); + resultsPersisterService = ResultsPersisterServiceTests.buildResultsPersisterService(originSettingClient); + + auditor = mock(AbstractAuditor.class); + + bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class); + } + + @After + public void verifyNoMoreInteractionsWithMocks() { + verify(client, atLeastOnce()).settings(); + verify(client, atLeastOnce()).threadPool(); + verifyNoMoreInteractions(client, auditor); + } + + public void testPersistAnnotation_Create() throws IOException { + doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(ANNOTATION_ID) }, 0L))) + .when(client).execute(eq(BulkAction.INSTANCE), any(), any()); + + AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor); + Annotation annotation = AnnotationTests.randomAnnotation(JOB_ID); + Tuple result = persister.persistAnnotation(null, annotation); + assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation)))); + + verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any()); + + List bulkRequests = bulkRequestCaptor.getAllValues(); + assertThat(bulkRequests, hasSize(1)); + BulkRequest bulkRequest = bulkRequests.get(0); + assertThat(bulkRequest.numberOfActions(), equalTo(1)); + + IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0); + assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME))); + assertThat(indexRequest.id(), is(nullValue())); + assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation))); + assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); + } + + public void testPersistAnnotation_Update() throws IOException { + doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(ANNOTATION_ID) }, 0L))) + .when(client).execute(eq(BulkAction.INSTANCE), any(), any()); + + AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor); + Annotation annotation = AnnotationTests.randomAnnotation(JOB_ID); + Tuple result = persister.persistAnnotation(ANNOTATION_ID, annotation); + assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation)))); + + verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any()); + + List bulkRequests = bulkRequestCaptor.getAllValues(); + assertThat(bulkRequests, hasSize(1)); + BulkRequest bulkRequest = bulkRequests.get(0); + assertThat(bulkRequest.numberOfActions(), equalTo(1)); + + IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0); + assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME))); + assertThat(indexRequest.id(), is(equalTo(ANNOTATION_ID))); + assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation))); + assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); + } + + public void testPersistMultipleAnnotationsWithBulk() { + doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(ANNOTATION_ID) }, 0L))) + .when(client).execute(eq(BulkAction.INSTANCE), any(), any()); + + AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor); + persister.bulkPersisterBuilder(JOB_ID) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .executeRequest(); + + verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any()); + + List bulkRequests = bulkRequestCaptor.getAllValues(); + assertThat(bulkRequests, hasSize(1)); + assertThat(bulkRequests.get(0).numberOfActions(), equalTo(5)); + } + + public void testPersistMultipleAnnotationsWithBulk_LowBulkLimit() { + doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(ANNOTATION_ID) }, 0L))) + .when(client).execute(eq(BulkAction.INSTANCE), any(), any()); + + AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor, 2); + persister.bulkPersisterBuilder(JOB_ID) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID)) + .executeRequest(); + + verify(client, times(3)).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any()); + + List bulkRequests = bulkRequestCaptor.getAllValues(); + assertThat(bulkRequests, hasSize(3)); + assertThat(bulkRequests.get(0).numberOfActions(), equalTo(2)); + assertThat(bulkRequests.get(1).numberOfActions(), equalTo(2)); + assertThat(bulkRequests.get(2).numberOfActions(), equalTo(1)); + } + + public void testPersistMultipleAnnotationsWithBulk_EmptyRequest() { + AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor); + assertThat(persister.bulkPersisterBuilder(JOB_ID).executeRequest(), is(nullValue())); + } + + public void testPersistMultipleAnnotationsWithBulk_Failure() { + doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{bulkItemFailure("1"), bulkItemFailure("2")}, 0L))) // (1) + .doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{bulkItemSuccess("1"), bulkItemFailure("2")}, 0L))) // (2) + .doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{bulkItemFailure("2")}, 0L))) // (3) + .when(client).execute(eq(BulkAction.INSTANCE), any(), any()); + + AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor); + AnnotationPersister.Builder persisterBuilder = persister.bulkPersisterBuilder(JOB_ID) + .persistAnnotation("1", AnnotationTests.randomAnnotation(JOB_ID)) + .persistAnnotation("2", AnnotationTests.randomAnnotation(JOB_ID)); + ElasticsearchException e = expectThrows(ElasticsearchException.class, persisterBuilder::executeRequest); + assertThat(e.getMessage(), containsString("failed to index after")); + + verify(client, atLeastOnce()).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any()); + verify(auditor, atLeastOnce()).warning(any(), any()); + + List bulkRequests = bulkRequestCaptor.getAllValues(); + assertThat(bulkRequests.get(0).numberOfActions(), equalTo(2)); // Original bulk request of size 2 + assertThat(bulkRequests.get(1).numberOfActions(), equalTo(2)); // Bulk request created from two failures returned in (1) + for (int i = 2; i < bulkRequests.size(); ++i) { + assertThat(bulkRequests.get(i).numberOfActions(), equalTo(1)); // Bulk request created from one failure returned in (2) and (3) + } + } + + @SuppressWarnings("unchecked") + private static Answer withResponse(Response response) { + return invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(response); + return null; + }; + } + + private static BulkItemResponse bulkItemSuccess(String docId) { + return new BulkItemResponse( + 1, + DocWriteRequest.OpType.INDEX, + new IndexResponse(new ShardId(AnnotationIndex.WRITE_ALIAS_NAME, "uuid", 1), "doc", docId, 0, 0, 1, true)); + } + + private static BulkItemResponse bulkItemFailure(String docId) { + return new BulkItemResponse( + 2, + DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("my-index", "doc", docId, new Exception("boom"))); + } + + private Annotation parseAnnotation(BytesReference source) throws IOException { + try (XContentParser parser = createParser(jsonXContent, source)) { + return Annotation.PARSER.parse(parser, null).build(); + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index f0bf91c6f18..b951519ae39 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -16,13 +16,13 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.util.QueryPage; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 5aa4483c9cc..b0541e2e46b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -6,15 +6,21 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; @@ -22,25 +28,29 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.security.user.XPackUser; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.core.ml.job.config.DataDescription; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; +import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterServiceTests; import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -54,6 +64,7 @@ import java.util.Optional; import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; @@ -61,6 +72,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -76,11 +88,11 @@ public class DatafeedJobTests extends ESTestCase { private DataExtractor dataExtractor; private DatafeedTimingStatsReporter timingStatsReporter; private Client client; + private ResultsPersisterService resultsPersisterService; private DelayedDataDetector delayedDataDetector; private DataDescription.Builder dataDescription; private ActionFuture postDataFuture; private ActionFuture flushJobFuture; - private ActionFuture indexFuture; private ArgumentCaptor flushJobRequests; private FlushJobAction.Response flushJobResponse; private String annotationDocId; @@ -98,13 +110,15 @@ public class DatafeedJobTests extends ESTestCase { timingStatsReporter = mock(DatafeedTimingStatsReporter.class); client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(threadPool); + when(client.settings()).thenReturn(Settings.EMPTY); + resultsPersisterService = + ResultsPersisterServiceTests.buildResultsPersisterService(new OriginSettingClient(client, ClientHelper.ML_ORIGIN)); dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); postDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); - indexFuture = mock(ActionFuture.class); annotationDocId = "AnnotationDocId"; flushJobResponse = new FlushJobAction.Response(true, new Date()); delayedDataDetector = mock(DelayedDataDetector.class); @@ -129,8 +143,8 @@ public class DatafeedJobTests extends ESTestCase { when(flushJobFuture.actionGet()).thenReturn(flushJobResponse); when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture); - when(indexFuture.actionGet()).thenReturn(new IndexResponse(new ShardId("index", "uuid", 0), "doc", annotationDocId, 0, 0, 0, true)); - when(client.index(any())).thenReturn(indexFuture); + doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(annotationDocId) }, 0L))) + .when(client).execute(eq(BulkAction.INSTANCE), any(), any()); } public void testLookBackRunWithEndTime() throws Exception { @@ -272,26 +286,32 @@ public class DatafeedJobTests extends ESTestCase { 10, XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000)); - Annotation expectedAnnotation = new Annotation.Builder() - .setAnnotation(msg) - .setCreateTime(new Date(currentTime)) - .setCreateUsername(XPackUser.NAME) - .setTimestamp(bucket.getTimestamp()) - .setEndTimestamp(new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000)) - .setJobId(jobId) - .setModifiedTime(new Date(currentTime)) - .setModifiedUsername(XPackUser.NAME) - .setType("annotation") - .build(); + long annotationCreateTime = currentTime; + { // What we expect the created annotation to be indexed as + Annotation expectedAnnotation = new Annotation.Builder() + .setAnnotation(msg) + .setCreateTime(new Date(annotationCreateTime)) + .setCreateUsername(XPackUser.NAME) + .setTimestamp(bucket.getTimestamp()) + .setEndTimestamp(new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000)) + .setJobId(jobId) + .setModifiedTime(new Date(annotationCreateTime)) + .setModifiedUsername(XPackUser.NAME) + .setType("annotation") + .build(); + BytesReference expectedSource = + BytesReference.bytes(expectedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); - IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME); - try (XContentBuilder xContentBuilder = expectedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { - request.source(xContentBuilder); + ArgumentCaptor bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class); + verify(client, atMost(2)).execute(eq(BulkAction.INSTANCE), bulkRequestArgumentCaptor.capture(), any()); + BulkRequest bulkRequest = bulkRequestArgumentCaptor.getValue(); + assertThat(bulkRequest.requests(), hasSize(1)); + IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0); + assertThat(indexRequest.index(), equalTo(AnnotationIndex.WRITE_ALIAS_NAME)); + assertThat(indexRequest.id(), nullValue()); + assertThat(indexRequest.source(), equalTo(expectedSource)); + assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); } - ArgumentCaptor indexRequestArgumentCaptor = ArgumentCaptor.forClass(IndexRequest.class); - verify(client, atMost(2)).index(indexRequestArgumentCaptor.capture()); - assertThat(request.index(), equalTo(indexRequestArgumentCaptor.getValue().index())); - assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source())); // Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated Bucket bucket2 = mock(Bucket.class); @@ -311,27 +331,34 @@ public class DatafeedJobTests extends ESTestCase { msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, 15, XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(6000)); - // What we expect the updated annotation to be indexed as - IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME); - indexRequest.id(annotationDocId); - Annotation updatedAnnotation = new Annotation.Builder(expectedAnnotation) - .setAnnotation(msg) - .setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000)) - .setModifiedTime(new Date(currentTime)) - .build(); - try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { - indexRequest.source(xContentBuilder); + + long annotationUpdateTime = currentTime; + { // What we expect the updated annotation to be indexed as + Annotation expectedUpdatedAnnotation = new Annotation.Builder() + .setAnnotation(msg) + .setCreateTime(new Date(annotationCreateTime)) + .setCreateUsername(XPackUser.NAME) + .setTimestamp(bucket.getTimestamp()) + .setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000)) + .setJobId(jobId) + .setModifiedTime(new Date(annotationUpdateTime)) + .setModifiedUsername(XPackUser.NAME) + .setType("annotation") + .build(); + BytesReference expectedSource = + BytesReference.bytes(expectedUpdatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); + + ArgumentCaptor bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class); + verify(client, atMost(2)).execute(eq(BulkAction.INSTANCE), bulkRequestArgumentCaptor.capture(), any()); + BulkRequest bulkRequest = bulkRequestArgumentCaptor.getValue(); + assertThat(bulkRequest.requests(), hasSize(1)); + IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0); + assertThat(indexRequest.index(), equalTo(AnnotationIndex.WRITE_ALIAS_NAME)); + assertThat(indexRequest.id(), equalTo(annotationDocId)); + assertThat(indexRequest.source(), equalTo(expectedSource)); + assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX)); } - ArgumentCaptor updateRequestArgumentCaptor = ArgumentCaptor.forClass(IndexRequest.class); - - verify(client, atMost(2)).index(updateRequestArgumentCaptor.capture()); - assertThat(indexRequest.index(), equalTo(updateRequestArgumentCaptor.getValue().index())); - assertThat(indexRequest.id(), equalTo(updateRequestArgumentCaptor.getValue().id())); - assertThat(indexRequest.source().utf8ToString(), - equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString())); - assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX)); - // Execute a fifth time, no changes should occur as annotation is the same currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1; inputStream = new ByteArrayInputStream(contentBytes); @@ -461,7 +488,23 @@ public class DatafeedJobTests extends ESTestCase { long latestRecordTimeMs, boolean haveSeenDataPreviously) { Supplier currentTimeSupplier = () -> currentTime; return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter, - client, auditor, new AnnotationPersister(client, auditor), currentTimeSupplier, delayedDataDetector, null, - latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously); + client, auditor, new AnnotationPersister(resultsPersisterService, auditor), currentTimeSupplier, + delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously); + } + + @SuppressWarnings("unchecked") + private static Answer withResponse(Response response) { + return invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(response); + return null; + }; + } + + private static BulkItemResponse bulkItemSuccess(String docId) { + return new BulkItemResponse( + 1, + DocWriteRequest.OpType.INDEX, + new IndexResponse(new ShardId(AnnotationIndex.WRITE_ALIAS_NAME, "uuid", 1), "doc", docId, 0, 0, 1, true)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 2a8759d5825..a5fbd5f8eb2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -108,7 +108,7 @@ public class JobResultsPersisterTests extends ESTestCase { AnomalyRecord record = new AnomalyRecord(JOB_ID, new Date(), 600); bucket.setRecords(Collections.singletonList(record)); - persister.bulkPersisterBuilder(JOB_ID, () -> true).persistBucket(bucket).executeRequest(); + persister.bulkPersisterBuilder(JOB_ID).persistBucket(bucket).executeRequest(); verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any()); @@ -159,7 +159,7 @@ public class JobResultsPersisterTests extends ESTestCase { typicals.add(998765.3); r1.setTypical(typicals); - persister.bulkPersisterBuilder(JOB_ID, () -> true).persistRecords(records).executeRequest(); + persister.bulkPersisterBuilder(JOB_ID).persistRecords(records).executeRequest(); verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any()); @@ -194,7 +194,7 @@ public class JobResultsPersisterTests extends ESTestCase { inf.setProbability(0.4); influencers.add(inf); - persister.bulkPersisterBuilder(JOB_ID, () -> true).persistInfluencers(influencers).executeRequest(); + persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers).executeRequest(); verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any()); @@ -217,13 +217,13 @@ public class JobResultsPersisterTests extends ESTestCase { inf.setProbability(0.4); influencers.add(inf); - JobResultsPersister.Builder builder = persister.bulkPersisterBuilder(JOB_ID, () -> true); + JobResultsPersister.Builder builder = persister.bulkPersisterBuilder(JOB_ID); builder.persistInfluencers(influencers).executeRequest(); assertEquals(0, builder.getBulkRequest().numberOfActions()); } public void testBulkRequestExecutesWhenReachMaxDocs() { - JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo", () -> true); + JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo"); ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456, 0); for (int i=0; i<=JobRenormalizedResultsPersister.BULK_LIMIT; i++) { bulkBuilder.persistModelPlot(modelPlot); @@ -240,7 +240,7 @@ public class JobResultsPersisterTests extends ESTestCase { TimingStats timingStats = new TimingStats( "foo", 7, 1.0, 2.0, 1.23, 7.89, new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(123456789), 60.0)); - persister.bulkPersisterBuilder(JOB_ID, () -> true).persistTimingStats(timingStats).executeRequest(); + persister.bulkPersisterBuilder(JOB_ID).persistTimingStats(timingStats).executeRequest(); InOrder inOrder = inOrder(client); inOrder.verify(client).settings(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index e6fdb6ad155..956052b367e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; @@ -49,6 +48,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapsho import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; @@ -152,7 +152,9 @@ public class AutodetectProcessManagerTests extends ESTestCase { jobManager = mock(JobManager.class); jobResultsProvider = mock(JobResultsProvider.class); jobResultsPersister = mock(JobResultsPersister.class); - when(jobResultsPersister.bulkPersisterBuilder(any(), any())).thenReturn(mock(JobResultsPersister.Builder.class)); + JobResultsPersister.Builder bulkPersisterBuilder = mock(JobResultsPersister.Builder.class); + when(bulkPersisterBuilder.shouldRetry(any())).thenReturn(bulkPersisterBuilder); + when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(bulkPersisterBuilder); jobDataCountsPersister = mock(JobDataCountsPersister.class); annotationPersister = mock(AnnotationPersister.class); autodetectCommunicator = mock(AutodetectCommunicator.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index 15a7cbb0e6e..c889a4786e7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; -import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; @@ -39,6 +38,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.core.security.user.XPackUser; +import org.elasticsearch.xpack.ml.annotations.AnnotationPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; @@ -105,8 +105,9 @@ public class AutodetectResultProcessorTests extends ESTestCase { renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); bulkBuilder = mock(JobResultsPersister.Builder.class); + when(bulkBuilder.shouldRetry(any())).thenReturn(bulkBuilder); annotationPersister = mock(AnnotationPersister.class); - when(persister.bulkPersisterBuilder(eq(JOB_ID), any())).thenReturn(bulkBuilder); + when(persister.bulkPersisterBuilder(eq(JOB_ID))).thenReturn(bulkBuilder); process = mock(AutodetectProcess.class); flushListener = mock(FlushListener.class); processorUnderTest = new AutodetectResultProcessor( @@ -138,7 +139,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); verify(renormalizer).waitUntilIdle(); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); verify(persister).commitStateWrites(JOB_ID); } @@ -156,7 +157,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(bulkBuilder).persistTimingStats(any(TimingStats.class)); verify(bulkBuilder).persistBucket(bucket); verify(bulkBuilder).executeRequest(); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister, never()).deleteInterimResults(JOB_ID); } @@ -173,7 +174,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(bulkBuilder).persistTimingStats(any(TimingStats.class)); verify(bulkBuilder).persistBucket(bucket); verify(bulkBuilder).executeRequest(); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).deleteInterimResults(JOB_ID); } @@ -190,7 +191,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(bulkBuilder).persistRecords(records); verify(bulkBuilder, never()).executeRequest(); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); } public void testProcessResult_influencers() { @@ -206,7 +207,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(bulkBuilder).persistInfluencers(influencers); verify(bulkBuilder, never()).executeRequest(); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); } public void testProcessResult_categoryDefinition() { @@ -220,7 +221,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { verify(bulkBuilder, never()).executeRequest(); verify(persister).persistCategoryDefinition(eq(categoryDefinition), any()); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); } public void testProcessResult_flushAcknowledgement() { @@ -233,7 +234,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { processorUnderTest.processResult(result); assertTrue(processorUnderTest.isDeleteInterimRequired()); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(flushListener).acknowledgeFlush(flushAcknowledgement, null); verify(persister).commitResultWrites(JOB_ID); verify(bulkBuilder).executeRequest(); @@ -253,7 +254,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { assertTrue(processorUnderTest.isDeleteInterimRequired()); InOrder inOrder = inOrder(persister, bulkBuilder, flushListener); - inOrder.verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + inOrder.verify(persister).bulkPersisterBuilder(eq(JOB_ID)); inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any()); inOrder.verify(bulkBuilder).executeRequest(); inOrder.verify(persister).commitResultWrites(JOB_ID); @@ -268,7 +269,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { processorUnderTest.setDeleteInterimRequired(false); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(bulkBuilder).persistModelPlot(modelPlot); } @@ -281,7 +282,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { processorUnderTest.processResult(result); assertThat(processorUnderTest.modelSizeStats(), is(equalTo(modelSizeStats))); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).persistModelSizeStats(eq(modelSizeStats), any()); } @@ -313,7 +314,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { when(result.getModelSizeStats()).thenReturn(modelSizeStats); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister, times(4)).persistModelSizeStats(any(ModelSizeStats.class), any()); // We should have only fired two notifications: one for soft_limit and one for hard_limit verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT)); @@ -340,7 +341,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { when(result.getModelSizeStats()).thenReturn(modelSizeStats); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister, times(3)).persistModelSizeStats(any(ModelSizeStats.class), any()); // We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0)); @@ -356,7 +357,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { when(result.getModelSizeStats()).thenReturn(modelSizeStats); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).persistModelSizeStats(any(ModelSizeStats.class), any()); // We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0)); @@ -379,12 +380,11 @@ public class AutodetectResultProcessorTests extends ESTestCase { processorUnderTest.setDeleteInterimRequired(false); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).persistModelSnapshot(eq(modelSnapshot), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any()); ArgumentCaptor annotationCaptor = ArgumentCaptor.forClass(Annotation.class); - verify(annotationPersister).persistAnnotation( - eq(ModelSnapshot.annotationDocumentId(modelSnapshot)), annotationCaptor.capture(), any()); + verify(annotationPersister).persistAnnotation(eq(ModelSnapshot.annotationDocumentId(modelSnapshot)), annotationCaptor.capture()); Annotation annotation = annotationCaptor.getValue(); Annotation expectedAnnotation = new Annotation.Builder() @@ -415,7 +415,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { processorUnderTest.setDeleteInterimRequired(false); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).persistQuantiles(eq(quantiles), any()); verify(bulkBuilder).executeRequest(); verify(persister).commitResultWrites(JOB_ID); @@ -432,7 +432,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { processorUnderTest.setDeleteInterimRequired(false); processorUnderTest.processResult(result); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).persistQuantiles(eq(quantiles), any()); verify(bulkBuilder).executeRequest(); verify(renormalizer).isEnabled(); @@ -447,7 +447,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1))); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); verify(persister).commitStateWrites(JOB_ID); verify(renormalizer).waitUntilIdle(); @@ -466,7 +466,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { processorUnderTest.process(); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any()); } @@ -485,7 +485,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { processorUnderTest.waitForFlushAcknowledgement(JOB_ID, Duration.of(300, ChronoUnit.SECONDS)); assertThat(flushAcknowledgement, is(nullValue())); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); } public void testKill() throws TimeoutException { @@ -498,7 +498,7 @@ public class AutodetectResultProcessorTests extends ESTestCase { assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1))); - verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); + verify(persister).bulkPersisterBuilder(eq(JOB_ID)); verify(persister).commitResultWrites(JOB_ID); verify(persister).commitStateWrites(JOB_ID); verify(renormalizer, never()).renormalize(any()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java index e09dd8f3245..5b9f72109bd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java @@ -368,7 +368,7 @@ public class ResultsPersisterServiceTests extends ESTestCase { }; } - private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) { + public static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) { CheckedConsumer sleeper = millis -> {}; ThreadPool tp = mock(ThreadPool.class); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,