[7.x] Make AnnotationPersister use bulk requests instead of indexing individual documents (#57278) (#57354)

This commit is contained in:
Przemysław Witek 2020-06-01 12:05:09 +02:00 committed by GitHub
parent 9fdf1722e6
commit 72ad9a4548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 519 additions and 305 deletions

View File

@ -1,67 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.annotations;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
/**
* Persists annotations to Elasticsearch index.
*/
public class AnnotationPersister {
private static final Logger logger = LogManager.getLogger(AnnotationPersister.class);
private final Client client;
private final AbstractAuditor auditor;
public AnnotationPersister(Client client, AbstractAuditor auditor) {
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
}
/**
* Persists the given annotation to annotations index.
*
* @param annotationId existing annotation id. If {@code null}, a new annotation will be created and id will be assigned automatically
* @param annotation annotation to be persisted
* @param errorMessage error message to report when annotation fails to be persisted
* @return tuple of the form (annotation id, annotation object)
*/
public Tuple<String, Annotation> persistAnnotation(@Nullable String annotationId, Annotation annotation, String errorMessage) {
Objects.requireNonNull(annotation);
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
IndexRequest indexRequest =
new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME)
.id(annotationId)
.source(xContentBuilder);
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
IndexResponse response = client.index(indexRequest).actionGet();
return Tuple.tuple(response.getId(), annotation);
}
} catch (IOException ex) {
String jobId = annotation.getJobId();
logger.error(errorMessage, ex);
auditor.error(jobId, errorMessage);
return null;
}
}
}

View File

@ -1,119 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.annotations;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import java.io.IOException;
import static org.elasticsearch.common.collect.Tuple.tuple;
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class AnnotationPersisterTests extends ESTestCase {
private static final String ANNOTATION_ID = "existing_annotation_id";
private static final String ERROR_MESSAGE = "an error occurred while persisting annotation";
private Client client;
private AbstractAuditor auditor;
private IndexResponse indexResponse;
private ArgumentCaptor<IndexRequest> indexRequestCaptor;
@Before
public void setUpMocks() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(threadContext);
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
auditor = mock(AbstractAuditor.class);
indexResponse = mock(IndexResponse.class);
when(indexResponse.getId()).thenReturn(ANNOTATION_ID);
indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
}
public void testPersistAnnotation_Create() throws IOException {
doReturn(instantFuture(indexResponse)).when(client).index(any());
AnnotationPersister persister = new AnnotationPersister(client, auditor);
Annotation annotation = AnnotationTests.randomAnnotation();
Tuple<String, Annotation> result = persister.persistAnnotation(null, annotation, ERROR_MESSAGE);
assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation))));
InOrder inOrder = inOrder(client);
inOrder.verify(client).threadPool();
inOrder.verify(client).index(indexRequestCaptor.capture());
verifyNoMoreInteractions(client, auditor);
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME)));
assertThat(indexRequest.id(), is(nullValue()));
assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation)));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}
public void testPersistAnnotation_Update() throws IOException {
doReturn(instantFuture(indexResponse)).when(client).index(any());
AnnotationPersister persister = new AnnotationPersister(client, auditor);
Annotation annotation = AnnotationTests.randomAnnotation();
Tuple<String, Annotation> result = persister.persistAnnotation(ANNOTATION_ID, annotation, ERROR_MESSAGE);
assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation))));
InOrder inOrder = inOrder(client);
inOrder.verify(client).threadPool();
inOrder.verify(client).index(indexRequestCaptor.capture());
verifyNoMoreInteractions(client, auditor);
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME)));
assertThat(indexRequest.id(), is(equalTo(ANNOTATION_ID)));
assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation)));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}
@SuppressWarnings("unchecked")
private static <T> ActionFuture<T> instantFuture(T response) {
ActionFuture future = mock(ActionFuture.class);
when(future.actionGet()).thenReturn(response);
return future;
}
private Annotation parseAnnotation(BytesReference source) throws IOException {
try (XContentParser parser = createParser(jsonXContent, source)) {
return Annotation.PARSER.parse(parser, null).build();
}
}
}

View File

@ -22,17 +22,17 @@ public class AnnotationTests extends AbstractSerializingTestCase<Annotation> {
@Override
protected Annotation createTestInstance() {
return randomAnnotation();
return randomAnnotation(randomBoolean() ? randomAlphaOfLengthBetween(10, 30) : null);
}
static Annotation randomAnnotation() {
public static Annotation randomAnnotation(String jobId) {
return new Annotation.Builder()
.setAnnotation(randomAlphaOfLengthBetween(100, 1000))
.setCreateTime(new Date(randomNonNegativeLong()))
.setCreateUsername(randomAlphaOfLengthBetween(5, 20))
.setTimestamp(new Date(randomNonNegativeLong()))
.setEndTimestamp(randomBoolean() ? new Date(randomNonNegativeLong()) : null)
.setJobId(randomBoolean() ? randomAlphaOfLengthBetween(10, 30) : null)
.setJobId(jobId)
.setModifiedTime(randomBoolean() ? new Date(randomNonNegativeLong()) : null)
.setModifiedUsername(randomBoolean() ? randomAlphaOfLengthBetween(5, 20) : null)
.setType(randomAlphaOfLengthBetween(10, 15))

View File

@ -36,7 +36,6 @@ import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
@ -55,6 +54,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
@ -149,7 +149,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
JOB_ID,
renormalizer,
new JobResultsPersister(originSettingClient, resultsPersisterService, new AnomalyDetectionAuditor(client(), "test_node")),
new AnnotationPersister(originSettingClient, auditor),
new AnnotationPersister(resultsPersisterService, auditor),
process,
new ModelSizeStats.Builder(JOB_ID).build(),
new TimingStats(JOB_ID)) {

View File

@ -252,7 +252,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
}
private void createBuckets(String jobId, int count) {
JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId, () -> true);
JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId);
for (int i = 1; i <= count; ++i) {
Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan);
builder.persistBucket(bucket);

View File

@ -200,7 +200,7 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase {
}
private void createBuckets(String jobId, int from, int count) {
JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId, () -> true);
JobResultsPersister.Builder builder = jobResultsPersister.bulkPersisterBuilder(jobId);
for (int i = from; i <= count + from; ++i) {
Bucket bucket = new Bucket(jobId, new Date(bucketSpan * i), bucketSpan);
builder.persistBucket(bucket);

View File

@ -127,7 +127,6 @@ import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.MlEvaluationNamedXContentProvider;
import org.elasticsearch.xpack.core.ml.inference.MlInferenceNamedXContentProvider;
@ -200,6 +199,7 @@ import org.elasticsearch.xpack.ml.action.TransportUpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportUpdateProcessAction;
import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
@ -548,12 +548,13 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry);
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
AnnotationPersister anomalyDetectionAnnotationPersister = new AnnotationPersister(client, anomalyDetectionAuditor);
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
InferenceAuditor inferenceAuditor = new InferenceAuditor(client, clusterService.getNodeName());
this.dataFrameAnalyticsAuditor.set(dataFrameAnalyticsAuditor);
OriginSettingClient originSettingClient = new OriginSettingClient(client, ClientHelper.ML_ORIGIN);
ResultsPersisterService resultsPersisterService = new ResultsPersisterService(originSettingClient, clusterService, settings);
AnnotationPersister anomalyDetectionAnnotationPersister =
new AnnotationPersister(resultsPersisterService, anomalyDetectionAuditor);
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings, indexNameExpressionResolver);
JobResultsPersister jobResultsPersister =
new JobResultsPersister(originSettingClient, resultsPersisterService, anomalyDetectionAuditor);

View File

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.annotations;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Persists annotations to Elasticsearch index.
*/
public class AnnotationPersister {
private static final Logger logger = LogManager.getLogger(AnnotationPersister.class);
private static final int DEFAULT_BULK_LIMIT = 10_000;
private final ResultsPersisterService resultsPersisterService;
private final AbstractAuditor<?> auditor;
/**
* Execute bulk requests when they reach this size
*/
private final int bulkLimit;
public AnnotationPersister(ResultsPersisterService resultsPersisterService, AbstractAuditor<?> auditor) {
this(resultsPersisterService, auditor, DEFAULT_BULK_LIMIT);
}
// For testing
AnnotationPersister(ResultsPersisterService resultsPersisterService, AbstractAuditor<?> auditor, int bulkLimit) {
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
this.auditor = Objects.requireNonNull(auditor);
this.bulkLimit = bulkLimit;
}
/**
* Persists the given annotation to annotations index.
*
* @param annotationId existing annotation id. If {@code null}, a new annotation will be created and id will be assigned automatically
* @param annotation annotation to be persisted
* @return tuple of the form (annotation id, annotation object)
*/
public Tuple<String, Annotation> persistAnnotation(@Nullable String annotationId, Annotation annotation) {
Objects.requireNonNull(annotation);
String jobId = annotation.getJobId();
BulkResponse bulkResponse = bulkPersisterBuilder(jobId).persistAnnotation(annotationId, annotation).executeRequest();
assert bulkResponse.getItems().length == 1;
return Tuple.tuple(bulkResponse.getItems()[0].getId(), annotation);
}
public Builder bulkPersisterBuilder(String jobId) {
return new Builder(jobId);
}
public class Builder {
private final String jobId;
private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME);
private Supplier<Boolean> shouldRetry = () -> true;
private Builder(String jobId) {
this.jobId = Objects.requireNonNull(jobId);
}
public Builder shouldRetry(Supplier<Boolean> shouldRetry) {
this.shouldRetry = Objects.requireNonNull(shouldRetry);
return this;
}
public Builder persistAnnotation(Annotation annotation) {
return persistAnnotation(null, annotation);
}
public Builder persistAnnotation(@Nullable String annotationId, Annotation annotation) {
Objects.requireNonNull(annotation);
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
bulkRequest.add(new IndexRequest().id(annotationId).source(xContentBuilder));
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising annotation", jobId), e);
}
if (bulkRequest.numberOfActions() >= bulkLimit) {
executeRequest();
}
return this;
}
/**
* Execute the bulk action
*/
public BulkResponse executeRequest() {
if (bulkRequest.numberOfActions() == 0) {
return null;
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", () -> jobId, () -> bulkRequest.numberOfActions());
BulkResponse bulkResponse =
resultsPersisterService.bulkIndexWithRetry(
bulkRequest, jobId, shouldRetry, msg -> auditor.warning(jobId, "Bulk indexing of annotations failed " + msg));
bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME);
return bulkResponse;
}
}
}

View File

@ -21,13 +21,13 @@ import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@ -223,19 +223,11 @@ class DatafeedJob {
auditor.warning(jobId, msg);
if (lastDataCheckAnnotationWithId == null) {
lastDataCheckAnnotationWithId =
annotationPersister.persistAnnotation(
null,
annotation,
"[" + jobId + "] failed to create annotation for delayed data checker.");
lastDataCheckAnnotationWithId = annotationPersister.persistAnnotation(null, annotation);
} else {
String annotationId = lastDataCheckAnnotationWithId.v1();
Annotation updatedAnnotation = updateAnnotation(annotation);
lastDataCheckAnnotationWithId =
annotationPersister.persistAnnotation(
annotationId,
updatedAnnotation,
"[" + jobId + "] failed to update annotation for delayed data checker.");
lastDataCheckAnnotationWithId = annotationPersister.persistAnnotation(annotationId, updatedAnnotation);
}
}
}

View File

@ -16,7 +16,6 @@ import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
@ -27,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;

View File

@ -89,23 +89,25 @@ public class JobResultsPersister {
this.auditor = auditor;
}
public Builder bulkPersisterBuilder(String jobId, Supplier<Boolean> shouldRetry) {
return new Builder(jobId, resultsPersisterService, shouldRetry);
public Builder bulkPersisterBuilder(String jobId) {
return new Builder(jobId);
}
public class Builder {
private BulkRequest bulkRequest;
private final String jobId;
private final String indexName;
private final Supplier<Boolean> shouldRetry;
private final ResultsPersisterService resultsPersisterService;
private Supplier<Boolean> shouldRetry = () -> true;
private Builder(String jobId, ResultsPersisterService resultsPersisterService, Supplier<Boolean> shouldRetry) {
private Builder(String jobId) {
this.bulkRequest = new BulkRequest();
this.jobId = Objects.requireNonNull(jobId);
indexName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
bulkRequest = new BulkRequest();
this.shouldRetry = shouldRetry;
this.resultsPersisterService = resultsPersisterService;
this.indexName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
}
public Builder shouldRetry(Supplier<Boolean> shouldRetry) {
this.shouldRetry = Objects.requireNonNull(shouldRetry);
return this;
}
/**

View File

@ -34,7 +34,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
@ -50,6 +49,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;

View File

@ -21,7 +21,7 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
@ -135,7 +135,7 @@ public class AutodetectResultProcessor {
this.process = Objects.requireNonNull(autodetectProcess);
this.flushListener = Objects.requireNonNull(flushListener);
this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive);
this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive);
this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
this.clock = Objects.requireNonNull(clock);
this.deleteInterimRequired = true;
@ -314,9 +314,7 @@ public class AutodetectResultProcessor {
updateModelSnapshotOnJob(modelSnapshot);
}
annotationPersister.persistAnnotation(
ModelSnapshot.annotationDocumentId(modelSnapshot),
createModelSnapshotAnnotation(modelSnapshot),
"[" + jobId + "] failed to create annotation for model snapshot.");
ModelSnapshot.annotationDocumentId(modelSnapshot), createModelSnapshotAnnotation(modelSnapshot));
}
Quantiles quantiles = result.getQuantiles();
if (quantiles != null) {

View File

@ -0,0 +1,240 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.annotations;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationTests;
import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterServiceTests;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.List;
import static org.elasticsearch.common.collect.Tuple.tuple;
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class AnnotationPersisterTests extends ESTestCase {
private static final String ANNOTATION_ID = "existing_annotation_id";
private static final String JOB_ID = "job_id";
private Client client;
private OriginSettingClient originSettingClient;
private ResultsPersisterService resultsPersisterService;
private AbstractAuditor<?> auditor;
private ArgumentCaptor<BulkRequest> bulkRequestCaptor;
@Before
public void setUpMocks() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(threadContext);
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN);
resultsPersisterService = ResultsPersisterServiceTests.buildResultsPersisterService(originSettingClient);
auditor = mock(AbstractAuditor.class);
bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class);
}
@After
public void verifyNoMoreInteractionsWithMocks() {
verify(client, atLeastOnce()).settings();
verify(client, atLeastOnce()).threadPool();
verifyNoMoreInteractions(client, auditor);
}
public void testPersistAnnotation_Create() throws IOException {
doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(ANNOTATION_ID) }, 0L)))
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());
AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor);
Annotation annotation = AnnotationTests.randomAnnotation(JOB_ID);
Tuple<String, Annotation> result = persister.persistAnnotation(null, annotation);
assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation))));
verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any());
List<BulkRequest> bulkRequests = bulkRequestCaptor.getAllValues();
assertThat(bulkRequests, hasSize(1));
BulkRequest bulkRequest = bulkRequests.get(0);
assertThat(bulkRequest.numberOfActions(), equalTo(1));
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0);
assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME)));
assertThat(indexRequest.id(), is(nullValue()));
assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation)));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}
public void testPersistAnnotation_Update() throws IOException {
doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(ANNOTATION_ID) }, 0L)))
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());
AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor);
Annotation annotation = AnnotationTests.randomAnnotation(JOB_ID);
Tuple<String, Annotation> result = persister.persistAnnotation(ANNOTATION_ID, annotation);
assertThat(result, is(equalTo(tuple(ANNOTATION_ID, annotation))));
verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any());
List<BulkRequest> bulkRequests = bulkRequestCaptor.getAllValues();
assertThat(bulkRequests, hasSize(1));
BulkRequest bulkRequest = bulkRequests.get(0);
assertThat(bulkRequest.numberOfActions(), equalTo(1));
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0);
assertThat(indexRequest.index(), is(equalTo(AnnotationIndex.WRITE_ALIAS_NAME)));
assertThat(indexRequest.id(), is(equalTo(ANNOTATION_ID)));
assertThat(parseAnnotation(indexRequest.source()), is(equalTo(annotation)));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}
public void testPersistMultipleAnnotationsWithBulk() {
doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(ANNOTATION_ID) }, 0L)))
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());
AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor);
persister.bulkPersisterBuilder(JOB_ID)
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.executeRequest();
verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any());
List<BulkRequest> bulkRequests = bulkRequestCaptor.getAllValues();
assertThat(bulkRequests, hasSize(1));
assertThat(bulkRequests.get(0).numberOfActions(), equalTo(5));
}
public void testPersistMultipleAnnotationsWithBulk_LowBulkLimit() {
doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(ANNOTATION_ID) }, 0L)))
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());
AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor, 2);
persister.bulkPersisterBuilder(JOB_ID)
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation(AnnotationTests.randomAnnotation(JOB_ID))
.executeRequest();
verify(client, times(3)).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any());
List<BulkRequest> bulkRequests = bulkRequestCaptor.getAllValues();
assertThat(bulkRequests, hasSize(3));
assertThat(bulkRequests.get(0).numberOfActions(), equalTo(2));
assertThat(bulkRequests.get(1).numberOfActions(), equalTo(2));
assertThat(bulkRequests.get(2).numberOfActions(), equalTo(1));
}
public void testPersistMultipleAnnotationsWithBulk_EmptyRequest() {
AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor);
assertThat(persister.bulkPersisterBuilder(JOB_ID).executeRequest(), is(nullValue()));
}
public void testPersistMultipleAnnotationsWithBulk_Failure() {
doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{bulkItemFailure("1"), bulkItemFailure("2")}, 0L))) // (1)
.doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{bulkItemSuccess("1"), bulkItemFailure("2")}, 0L))) // (2)
.doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{bulkItemFailure("2")}, 0L))) // (3)
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());
AnnotationPersister persister = new AnnotationPersister(resultsPersisterService, auditor);
AnnotationPersister.Builder persisterBuilder = persister.bulkPersisterBuilder(JOB_ID)
.persistAnnotation("1", AnnotationTests.randomAnnotation(JOB_ID))
.persistAnnotation("2", AnnotationTests.randomAnnotation(JOB_ID));
ElasticsearchException e = expectThrows(ElasticsearchException.class, persisterBuilder::executeRequest);
assertThat(e.getMessage(), containsString("failed to index after"));
verify(client, atLeastOnce()).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any());
verify(auditor, atLeastOnce()).warning(any(), any());
List<BulkRequest> bulkRequests = bulkRequestCaptor.getAllValues();
assertThat(bulkRequests.get(0).numberOfActions(), equalTo(2)); // Original bulk request of size 2
assertThat(bulkRequests.get(1).numberOfActions(), equalTo(2)); // Bulk request created from two failures returned in (1)
for (int i = 2; i < bulkRequests.size(); ++i) {
assertThat(bulkRequests.get(i).numberOfActions(), equalTo(1)); // Bulk request created from one failure returned in (2) and (3)
}
}
@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
};
}
private static BulkItemResponse bulkItemSuccess(String docId) {
return new BulkItemResponse(
1,
DocWriteRequest.OpType.INDEX,
new IndexResponse(new ShardId(AnnotationIndex.WRITE_ALIAS_NAME, "uuid", 1), "doc", docId, 0, 0, 1, true));
}
private static BulkItemResponse bulkItemFailure(String docId) {
return new BulkItemResponse(
2,
DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("my-index", "doc", docId, new Exception("boom")));
}
private Annotation parseAnnotation(BytesReference source) throws IOException {
try (XContentParser parser = createParser(jsonXContent, source)) {
return Annotation.PARSER.parse(parser, null).build();
}
}
}

View File

@ -16,13 +16,13 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;

View File

@ -6,15 +6,21 @@
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
@ -22,25 +28,29 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterServiceTests;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -54,6 +64,7 @@ import java.util.Optional;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
@ -61,6 +72,7 @@ import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -76,11 +88,11 @@ public class DatafeedJobTests extends ESTestCase {
private DataExtractor dataExtractor;
private DatafeedTimingStatsReporter timingStatsReporter;
private Client client;
private ResultsPersisterService resultsPersisterService;
private DelayedDataDetector delayedDataDetector;
private DataDescription.Builder dataDescription;
private ActionFuture<PostDataAction.Response> postDataFuture;
private ActionFuture<FlushJobAction.Response> flushJobFuture;
private ActionFuture<IndexResponse> indexFuture;
private ArgumentCaptor<FlushJobAction.Request> flushJobRequests;
private FlushJobAction.Response flushJobResponse;
private String annotationDocId;
@ -98,13 +110,15 @@ public class DatafeedJobTests extends ESTestCase {
timingStatsReporter = mock(DatafeedTimingStatsReporter.class);
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);
when(client.settings()).thenReturn(Settings.EMPTY);
resultsPersisterService =
ResultsPersisterServiceTests.buildResultsPersisterService(new OriginSettingClient(client, ClientHelper.ML_ORIGIN));
dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
postDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
indexFuture = mock(ActionFuture.class);
annotationDocId = "AnnotationDocId";
flushJobResponse = new FlushJobAction.Response(true, new Date());
delayedDataDetector = mock(DelayedDataDetector.class);
@ -129,8 +143,8 @@ public class DatafeedJobTests extends ESTestCase {
when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
when(indexFuture.actionGet()).thenReturn(new IndexResponse(new ShardId("index", "uuid", 0), "doc", annotationDocId, 0, 0, 0, true));
when(client.index(any())).thenReturn(indexFuture);
doAnswer(withResponse(new BulkResponse(new BulkItemResponse[]{ bulkItemSuccess(annotationDocId) }, 0L)))
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());
}
public void testLookBackRunWithEndTime() throws Exception {
@ -272,26 +286,32 @@ public class DatafeedJobTests extends ESTestCase {
10,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000));
Annotation expectedAnnotation = new Annotation.Builder()
.setAnnotation(msg)
.setCreateTime(new Date(currentTime))
.setCreateUsername(XPackUser.NAME)
.setTimestamp(bucket.getTimestamp())
.setEndTimestamp(new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000))
.setJobId(jobId)
.setModifiedTime(new Date(currentTime))
.setModifiedUsername(XPackUser.NAME)
.setType("annotation")
.build();
long annotationCreateTime = currentTime;
{ // What we expect the created annotation to be indexed as
Annotation expectedAnnotation = new Annotation.Builder()
.setAnnotation(msg)
.setCreateTime(new Date(annotationCreateTime))
.setCreateUsername(XPackUser.NAME)
.setTimestamp(bucket.getTimestamp())
.setEndTimestamp(new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000))
.setJobId(jobId)
.setModifiedTime(new Date(annotationCreateTime))
.setModifiedUsername(XPackUser.NAME)
.setType("annotation")
.build();
BytesReference expectedSource =
BytesReference.bytes(expectedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
try (XContentBuilder xContentBuilder = expectedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
request.source(xContentBuilder);
ArgumentCaptor<BulkRequest> bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(client, atMost(2)).execute(eq(BulkAction.INSTANCE), bulkRequestArgumentCaptor.capture(), any());
BulkRequest bulkRequest = bulkRequestArgumentCaptor.getValue();
assertThat(bulkRequest.requests(), hasSize(1));
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0);
assertThat(indexRequest.index(), equalTo(AnnotationIndex.WRITE_ALIAS_NAME));
assertThat(indexRequest.id(), nullValue());
assertThat(indexRequest.source(), equalTo(expectedSource));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}
ArgumentCaptor<IndexRequest> indexRequestArgumentCaptor = ArgumentCaptor.forClass(IndexRequest.class);
verify(client, atMost(2)).index(indexRequestArgumentCaptor.capture());
assertThat(request.index(), equalTo(indexRequestArgumentCaptor.getValue().index()));
assertThat(request.source(), equalTo(indexRequestArgumentCaptor.getValue().source()));
// Execute a fourth time, this time we return a new delayedDataDetector response to verify annotation gets updated
Bucket bucket2 = mock(Bucket.class);
@ -311,27 +331,34 @@ public class DatafeedJobTests extends ESTestCase {
msg = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
15,
XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(6000));
// What we expect the updated annotation to be indexed as
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME);
indexRequest.id(annotationDocId);
Annotation updatedAnnotation = new Annotation.Builder(expectedAnnotation)
.setAnnotation(msg)
.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000))
.setModifiedTime(new Date(currentTime))
.build();
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
indexRequest.source(xContentBuilder);
long annotationUpdateTime = currentTime;
{ // What we expect the updated annotation to be indexed as
Annotation expectedUpdatedAnnotation = new Annotation.Builder()
.setAnnotation(msg)
.setCreateTime(new Date(annotationCreateTime))
.setCreateUsername(XPackUser.NAME)
.setTimestamp(bucket.getTimestamp())
.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000))
.setJobId(jobId)
.setModifiedTime(new Date(annotationUpdateTime))
.setModifiedUsername(XPackUser.NAME)
.setType("annotation")
.build();
BytesReference expectedSource =
BytesReference.bytes(expectedUpdatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
ArgumentCaptor<BulkRequest> bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class);
verify(client, atMost(2)).execute(eq(BulkAction.INSTANCE), bulkRequestArgumentCaptor.capture(), any());
BulkRequest bulkRequest = bulkRequestArgumentCaptor.getValue();
assertThat(bulkRequest.requests(), hasSize(1));
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0);
assertThat(indexRequest.index(), equalTo(AnnotationIndex.WRITE_ALIAS_NAME));
assertThat(indexRequest.id(), equalTo(annotationDocId));
assertThat(indexRequest.source(), equalTo(expectedSource));
assertThat(indexRequest.opType(), equalTo(DocWriteRequest.OpType.INDEX));
}
ArgumentCaptor<IndexRequest> updateRequestArgumentCaptor = ArgumentCaptor.forClass(IndexRequest.class);
verify(client, atMost(2)).index(updateRequestArgumentCaptor.capture());
assertThat(indexRequest.index(), equalTo(updateRequestArgumentCaptor.getValue().index()));
assertThat(indexRequest.id(), equalTo(updateRequestArgumentCaptor.getValue().id()));
assertThat(indexRequest.source().utf8ToString(),
equalTo(updateRequestArgumentCaptor.getValue().source().utf8ToString()));
assertThat(updateRequestArgumentCaptor.getValue().opType(), equalTo(DocWriteRequest.OpType.INDEX));
// Execute a fifth time, no changes should occur as annotation is the same
currentTime = currentTime + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
inputStream = new ByteArrayInputStream(contentBytes);
@ -461,7 +488,23 @@ public class DatafeedJobTests extends ESTestCase {
long latestRecordTimeMs, boolean haveSeenDataPreviously) {
Supplier<Long> currentTimeSupplier = () -> currentTime;
return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter,
client, auditor, new AnnotationPersister(client, auditor), currentTimeSupplier, delayedDataDetector, null,
latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously);
client, auditor, new AnnotationPersister(resultsPersisterService, auditor), currentTimeSupplier,
delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs, haveSeenDataPreviously);
}
@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
};
}
private static BulkItemResponse bulkItemSuccess(String docId) {
return new BulkItemResponse(
1,
DocWriteRequest.OpType.INDEX,
new IndexResponse(new ShardId(AnnotationIndex.WRITE_ALIAS_NAME, "uuid", 1), "doc", docId, 0, 0, 1, true));
}
}

View File

@ -108,7 +108,7 @@ public class JobResultsPersisterTests extends ESTestCase {
AnomalyRecord record = new AnomalyRecord(JOB_ID, new Date(), 600);
bucket.setRecords(Collections.singletonList(record));
persister.bulkPersisterBuilder(JOB_ID, () -> true).persistBucket(bucket).executeRequest();
persister.bulkPersisterBuilder(JOB_ID).persistBucket(bucket).executeRequest();
verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any());
@ -159,7 +159,7 @@ public class JobResultsPersisterTests extends ESTestCase {
typicals.add(998765.3);
r1.setTypical(typicals);
persister.bulkPersisterBuilder(JOB_ID, () -> true).persistRecords(records).executeRequest();
persister.bulkPersisterBuilder(JOB_ID).persistRecords(records).executeRequest();
verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any());
@ -194,7 +194,7 @@ public class JobResultsPersisterTests extends ESTestCase {
inf.setProbability(0.4);
influencers.add(inf);
persister.bulkPersisterBuilder(JOB_ID, () -> true).persistInfluencers(influencers).executeRequest();
persister.bulkPersisterBuilder(JOB_ID).persistInfluencers(influencers).executeRequest();
verify(client).execute(eq(BulkAction.INSTANCE), bulkRequestCaptor.capture(), any());
@ -217,13 +217,13 @@ public class JobResultsPersisterTests extends ESTestCase {
inf.setProbability(0.4);
influencers.add(inf);
JobResultsPersister.Builder builder = persister.bulkPersisterBuilder(JOB_ID, () -> true);
JobResultsPersister.Builder builder = persister.bulkPersisterBuilder(JOB_ID);
builder.persistInfluencers(influencers).executeRequest();
assertEquals(0, builder.getBulkRequest().numberOfActions());
}
public void testBulkRequestExecutesWhenReachMaxDocs() {
JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo", () -> true);
JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo");
ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456, 0);
for (int i=0; i<=JobRenormalizedResultsPersister.BULK_LIMIT; i++) {
bulkBuilder.persistModelPlot(modelPlot);
@ -240,7 +240,7 @@ public class JobResultsPersisterTests extends ESTestCase {
TimingStats timingStats =
new TimingStats(
"foo", 7, 1.0, 2.0, 1.23, 7.89, new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(123456789), 60.0));
persister.bulkPersisterBuilder(JOB_ID, () -> true).persistTimingStats(timingStats).executeRequest();
persister.bulkPersisterBuilder(JOB_ID).persistTimingStats(timingStats).executeRequest();
InOrder inOrder = inOrder(client);
inOrder.verify(client).settings();

View File

@ -30,7 +30,6 @@ import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
@ -49,6 +48,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapsho
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
@ -152,7 +152,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
jobManager = mock(JobManager.class);
jobResultsProvider = mock(JobResultsProvider.class);
jobResultsPersister = mock(JobResultsPersister.class);
when(jobResultsPersister.bulkPersisterBuilder(any(), any())).thenReturn(mock(JobResultsPersister.Builder.class));
JobResultsPersister.Builder bulkPersisterBuilder = mock(JobResultsPersister.Builder.class);
when(bulkPersisterBuilder.shouldRetry(any())).thenReturn(bulkPersisterBuilder);
when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(bulkPersisterBuilder);
jobDataCountsPersister = mock(JobDataCountsPersister.class);
annotationPersister = mock(AnnotationPersister.class);
autodetectCommunicator = mock(AutodetectCommunicator.class);

View File

@ -24,7 +24,6 @@ import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
@ -39,6 +38,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.elasticsearch.xpack.ml.annotations.AnnotationPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
@ -105,8 +105,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
renormalizer = mock(Renormalizer.class);
persister = mock(JobResultsPersister.class);
bulkBuilder = mock(JobResultsPersister.Builder.class);
when(bulkBuilder.shouldRetry(any())).thenReturn(bulkBuilder);
annotationPersister = mock(AnnotationPersister.class);
when(persister.bulkPersisterBuilder(eq(JOB_ID), any())).thenReturn(bulkBuilder);
when(persister.bulkPersisterBuilder(eq(JOB_ID))).thenReturn(bulkBuilder);
process = mock(AutodetectProcess.class);
flushListener = mock(FlushListener.class);
processorUnderTest = new AutodetectResultProcessor(
@ -138,7 +139,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
verify(renormalizer).waitUntilIdle();
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitStateWrites(JOB_ID);
}
@ -156,7 +157,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder).persistBucket(bucket);
verify(bulkBuilder).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister, never()).deleteInterimResults(JOB_ID);
}
@ -173,7 +174,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
verify(bulkBuilder).persistBucket(bucket);
verify(bulkBuilder).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).deleteInterimResults(JOB_ID);
}
@ -190,7 +191,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(bulkBuilder).persistRecords(records);
verify(bulkBuilder, never()).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}
public void testProcessResult_influencers() {
@ -206,7 +207,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(bulkBuilder).persistInfluencers(influencers);
verify(bulkBuilder, never()).executeRequest();
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}
public void testProcessResult_categoryDefinition() {
@ -220,7 +221,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
verify(bulkBuilder, never()).executeRequest();
verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}
public void testProcessResult_flushAcknowledgement() {
@ -233,7 +234,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.processResult(result);
assertTrue(processorUnderTest.isDeleteInterimRequired());
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
verify(persister).commitResultWrites(JOB_ID);
verify(bulkBuilder).executeRequest();
@ -253,7 +254,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
assertTrue(processorUnderTest.isDeleteInterimRequired());
InOrder inOrder = inOrder(persister, bulkBuilder, flushListener);
inOrder.verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
inOrder.verify(persister).bulkPersisterBuilder(eq(JOB_ID));
inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
inOrder.verify(bulkBuilder).executeRequest();
inOrder.verify(persister).commitResultWrites(JOB_ID);
@ -268,7 +269,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(bulkBuilder).persistModelPlot(modelPlot);
}
@ -281,7 +282,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.processResult(result);
assertThat(processorUnderTest.modelSizeStats(), is(equalTo(modelSizeStats)));
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistModelSizeStats(eq(modelSizeStats), any());
}
@ -313,7 +314,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister, times(4)).persistModelSizeStats(any(ModelSizeStats.class), any());
// We should have only fired two notifications: one for soft_limit and one for hard_limit
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
@ -340,7 +341,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister, times(3)).persistModelSizeStats(any(ModelSizeStats.class), any());
// We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
@ -356,7 +357,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
when(result.getModelSizeStats()).thenReturn(modelSizeStats);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistModelSizeStats(any(ModelSizeStats.class), any());
// We should have only fired one notification; only the change from ok to warn should have fired, not the subsequent warn
verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_CATEGORIZATION_STATUS_WARN, "warn", 0));
@ -379,12 +380,11 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistModelSnapshot(eq(modelSnapshot), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any());
ArgumentCaptor<Annotation> annotationCaptor = ArgumentCaptor.forClass(Annotation.class);
verify(annotationPersister).persistAnnotation(
eq(ModelSnapshot.annotationDocumentId(modelSnapshot)), annotationCaptor.capture(), any());
verify(annotationPersister).persistAnnotation(eq(ModelSnapshot.annotationDocumentId(modelSnapshot)), annotationCaptor.capture());
Annotation annotation = annotationCaptor.getValue();
Annotation expectedAnnotation =
new Annotation.Builder()
@ -415,7 +415,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistQuantiles(eq(quantiles), any());
verify(bulkBuilder).executeRequest();
verify(persister).commitResultWrites(JOB_ID);
@ -432,7 +432,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.setDeleteInterimRequired(false);
processorUnderTest.processResult(result);
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).persistQuantiles(eq(quantiles), any());
verify(bulkBuilder).executeRequest();
verify(renormalizer).isEnabled();
@ -447,7 +447,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1)));
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitStateWrites(JOB_ID);
verify(renormalizer).waitUntilIdle();
@ -466,7 +466,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.process();
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any());
}
@ -485,7 +485,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
processorUnderTest.waitForFlushAcknowledgement(JOB_ID, Duration.of(300, ChronoUnit.SECONDS));
assertThat(flushAcknowledgement, is(nullValue()));
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}
public void testKill() throws TimeoutException {
@ -498,7 +498,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1)));
verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
verify(persister).commitResultWrites(JOB_ID);
verify(persister).commitStateWrites(JOB_ID);
verify(renormalizer, never()).renormalize(any());

View File

@ -368,7 +368,7 @@ public class ResultsPersisterServiceTests extends ESTestCase {
};
}
private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) {
public static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) {
CheckedConsumer<Integer, InterruptedException> sleeper = millis -> {};
ThreadPool tp = mock(ThreadPool.class);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,