From 77bfb32635f862ea4f6cc772f4c975d9d4692785 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 22 Sep 2020 15:07:08 -0400 Subject: [PATCH] [7.x] [ML] changing to not use global bulk indexing parameters in conjunction with add(object) calls (#62694) (#62784) * [ML] changing to not use global bulk indexing parameters in conjunction with add(object) calls (#62694) * [ML] changing to not use global bulk indexing parameters in conjunction with add(object) calls global parameters, outside of the global index, are ignored for internal callers in certain cases. If the interal caller is adding requests via the following methods: ``` - BulkRequest#add(IndexRequest) - BulkRequest#add(UpdateRequest) - BulkRequest#add(DocWriteRequest) - BulkRequest#add(DocWriteRequest[]) ``` It is better to specifically set the desired parameters on the requests before they are added to the bulk request object. This commit addresses this issue for the ML plugin * unmuting test --- .../action/bulk/BulkRequest.java | 41 +++++++++++++++++++ .../xpack/core/ml/utils/MlIndexAndAlias.java | 25 ++++++++--- .../integration/ModelSnapshotRetentionIT.java | 25 ++++++++++- .../xpack/ml/integration/RegressionIT.java | 1 - .../AutodetectResultProcessorIT.java | 10 +++++ .../ml/integration/JobResultsProviderIT.java | 5 +++ .../ml/annotations/AnnotationPersister.java | 6 +-- .../inference/TrainedModelStatsService.java | 5 ++- .../ml/process/IndexingStateProcessor.java | 2 +- .../persistence/ResultsPersisterService.java | 5 +-- .../DataFrameAnalyticsTaskTests.java | 1 + .../persistence/JobResultsPersisterTests.java | 2 + .../process/IndexingStateProcessorTests.java | 15 +++++++ 13 files changed, 125 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index d4ff312d8cf..4024e59bb0e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -118,6 +118,11 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques /** * Add a request to the current BulkRequest. + * + * Note for internal callers: This method does not respect all global parameters. + * Only the global index is applied to the request objects. + * Global parameters would be respected if the request was serialized for a REST call as it is + * in the high level rest client. * @param request Request to add * @return the current bulk request */ @@ -350,11 +355,35 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return this; } + /** + * Note for internal callers (NOT high level rest client), + * the global parameter setting is ignored when used with: + * + * - {@link BulkRequest#add(IndexRequest)} + * - {@link BulkRequest#add(UpdateRequest)} + * - {@link BulkRequest#add(DocWriteRequest)} + * - {@link BulkRequest#add(DocWriteRequest[])} )} + * - {@link BulkRequest#add(Iterable)} + * @param globalPipeline the global default setting + * @return Bulk request with global setting set + */ public final BulkRequest pipeline(String globalPipeline) { this.globalPipeline = globalPipeline; return this; } + /** + * Note for internal callers (NOT high level rest client), + * the global parameter setting is ignored when used with: + * + - {@link BulkRequest#add(IndexRequest)} + - {@link BulkRequest#add(UpdateRequest)} + - {@link BulkRequest#add(DocWriteRequest)} + - {@link BulkRequest#add(DocWriteRequest[])} )} + - {@link BulkRequest#add(Iterable)} + * @param globalRouting the global default setting + * @return Bulk request with global setting set + */ public final BulkRequest routing(String globalRouting){ this.globalRouting = globalRouting; return this; @@ -382,6 +411,18 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return globalRequireAlias; } + /** + * Note for internal callers (NOT high level rest client), + * the global parameter setting is ignored when used with: + * + * - {@link BulkRequest#add(IndexRequest)} + * - {@link BulkRequest#add(UpdateRequest)} + * - {@link BulkRequest#add(DocWriteRequest)} + * - {@link BulkRequest#add(DocWriteRequest[])} )} + * - {@link BulkRequest#add(Iterable)} + * @param globalRequireAlias the global default setting + * @return Bulk request with global setting set + */ public BulkRequest requireAlias(Boolean globalRequireAlias) { this.globalRequireAlias = globalRequireAlias; return this; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 2024c5d2938..00f68246e4e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ml.utils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -88,16 +89,28 @@ public final class MlIndexAndAlias { String alias, ActionListener finalListener) { + final ActionListener loggingListener = ActionListener.wrap( + finalListener::onResponse, + e -> { + logger.error(new ParameterizedMessage( + "Failed to create alias and index with pattern [{}] and alias [{}]", + indexPatternPrefix, + alias), + e); + finalListener.onFailure(e); + } + ); + // If both the index and alias were successfully created then wait for the shards of the index that the alias points to be ready ActionListener indexCreatedListener = ActionListener.wrap( created -> { if (created) { - waitForShardsReady(client, alias, finalListener); + waitForShardsReady(client, alias, loggingListener); } else { - finalListener.onResponse(false); + loggingListener.onResponse(false); } }, - finalListener::onFailure + loggingListener::onFailure ); boolean isHiddenAttributeAvailable = clusterState.nodes().getMinNodeVersion().onOrAfter(HIDDEN_INTRODUCED_VERSION); @@ -135,7 +148,7 @@ public final class MlIndexAndAlias { ActionListener.wrap( unused -> updateWriteAlias( client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, isHiddenAttributeAvailable, indexCreatedListener), - finalListener::onFailure) + loggingListener::onFailure) ); return; } @@ -146,12 +159,12 @@ public final class MlIndexAndAlias { if (indexPointedByCurrentWriteAlias.isPresent() == false) { assert concreteIndexNames.length > 0; String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get(); - updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, finalListener); + updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, loggingListener); return; } } // If the alias is set, there is nothing more to do. - finalListener.onResponse(false); + loggingListener.onResponse(false); } private static void waitForShardsReady(Client client, String index, ActionListener listener) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java index d7f276450b4..8ba21efcee2 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java @@ -16,7 +16,10 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -36,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.ArrayList; @@ -43,6 +47,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -51,6 +56,21 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { private static final long MS_IN_DAY = TimeValue.timeValueDays(1).millis(); + /** + * In production the only way to create a model snapshot is to open a job, and + * opening a job ensures that the state index exists. This suite does not open jobs + * but instead inserts snapshot and state documents directly to the results and + * state indices. This means it needs to create the state index explicitly. This + * method should not be copied to test suites that run jobs in the way they are + * run in production. + */ + @Before + public void addMlState() { + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future); + future.actionGet(); + } + @After public void cleanUpTest() { cleanUp(); @@ -220,12 +240,13 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) { assertThat(numDocs, greaterThan(0)); - BulkRequest bulkRequest = new BulkRequest().requireAlias(true); + BulkRequest bulkRequest = new BulkRequest(); for (int i = 1; i <= numDocs; ++i) { IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) .id(ModelState.documentId(jobId, snapshotId, i)) // The exact contents of the model state doesn't matter - we are not going to try and restore it - .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))); + .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))) + .setRequireAlias(true); bulkRequest.add(indexRequest); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 8a70b3928c4..bfddfca6cbe 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -86,7 +86,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { return new NamedXContentRegistry(entries); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/59413") public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws Exception { initialize("regression_single_numeric_feature_and_mixed_data_set"); String predictedClassField = DEPENDENT_VARIABLE_FIELD + "_prediction"; diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 1401dd8b986..ab2bef15b2c 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -9,8 +9,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -94,6 +96,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.stream.Collectors.toList; import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -168,6 +171,13 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { }; waitForMlTemplates(); putJob(); + // In production opening a job ensures the state index exists. These tests + // do not open jobs, but instead feed JSON directly to the results processor. + // A a result they must create the index as part of the test setup. Do not + // copy this setup to tests that run jobs in the way they are run in production. + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future); + future.get(); } @After diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 0d46a66e187..4c6ad04426e 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -88,6 +89,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -917,6 +919,9 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase { } private void indexQuantiles(Quantiles quantiles) { + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future); + future.actionGet(); JobResultsPersister persister = new JobResultsPersister(new OriginSettingClient(client(), ClientHelper.ML_ORIGIN), resultsPersisterService, auditor); persister.persistQuantiles(quantiles, () -> true); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java index ff9add3cd1f..dffd36e9488 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java @@ -74,7 +74,7 @@ public class AnnotationPersister { public class Builder { private final String jobId; - private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true); + private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); private Supplier shouldRetry = () -> true; private Builder(String jobId) { @@ -93,7 +93,7 @@ public class AnnotationPersister { public Builder persistAnnotation(@Nullable String annotationId, Annotation annotation) { Objects.requireNonNull(annotation); try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { - bulkRequest.add(new IndexRequest().id(annotationId).source(xContentBuilder)); + bulkRequest.add(new IndexRequest().id(annotationId).source(xContentBuilder).setRequireAlias(true)); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error serialising annotation", jobId), e); } @@ -115,7 +115,7 @@ public class AnnotationPersister { BulkResponse bulkResponse = resultsPersisterService.bulkIndexWithRetry( bulkRequest, jobId, shouldRetry, msg -> auditor.warning(jobId, "Bulk indexing of annotations failed " + msg)); - bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true); + bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); return bulkResponse; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java index fe0ba993f51..d5a58e62fc7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java @@ -188,7 +188,7 @@ public class TrainedModelStatsService { if (stats.isEmpty()) { return; } - BulkRequest bulkRequest = new BulkRequest().requireAlias(true); + BulkRequest bulkRequest = new BulkRequest(); stats.stream().map(TrainedModelStatsService::buildUpdateRequest).filter(Objects::nonNull).forEach(bulkRequest::add); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); if (bulkRequest.requests().isEmpty()) { @@ -254,7 +254,8 @@ public class TrainedModelStatsService { // out of band. If there is MANY more than that, something strange is happening and it should fail. .retryOnConflict(3) .id(InferenceStats.docId(stats.getModelId(), stats.getNodeId())) - .script(new Script(ScriptType.INLINE, "painless", STATS_UPDATE_SCRIPT, params)); + .script(new Script(ScriptType.INLINE, "painless", STATS_UPDATE_SCRIPT, params)) + .setRequireAlias(true); return updateRequest; } catch (IOException ex) { logger.error( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java index 7f4497addc7..2af0709378a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java @@ -49,7 +49,7 @@ import java.util.Objects; * 2. Document id is extracted from this line. * 3. Document with this id is searched for in .ml-state* indices * 4. If the document is found, it is overwritten in place (i.e. in the same index) with the new content. - * Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-writei + * Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-write */ public class IndexingStateProcessor implements StateProcessor { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java index 98c9bc243f0..0588adbf923 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java @@ -112,9 +112,9 @@ public class ResultsPersisterService { boolean requireAlias, Supplier shouldRetry, Consumer msgHandler) throws IOException { - BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy).requireAlias(requireAlias); + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy); try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params)) { - bulkRequest.add(new IndexRequest(indexName).id(id).source(content)); + bulkRequest.add(new IndexRequest(indexName).id(id).source(content).setRequireAlias(requireAlias)); } return bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, msgHandler); } @@ -288,7 +288,6 @@ public class ResultsPersisterService { private BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest, BulkResponse bulkResponse) { // If we failed, lets set the bulkRequest to be a collection of the failed requests BulkRequest bulkRequestOfFailures = new BulkRequest(); - bulkRequestOfFailures.requireAlias(bulkRequest.requireAlias()); Set failedDocIds = Arrays.stream(bulkResponse.getItems()) .filter(BulkItemResponse::isFailed) .map(BulkItemResponse::getId) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java index 3d97bfc8237..aac55f5c134 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java @@ -178,6 +178,7 @@ public class DataFrameAnalyticsTaskTests extends ESTestCase { IndexRequest indexRequest = indexRequestCaptor.getValue(); assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias)); + assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias))); assertThat(indexRequest.id(), equalTo("data_frame_analytics-task_id-progress")); try (XContentParser parser = JsonXContent.jsonXContent.createParser( diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index a5fbd5f8eb2..46f706f7994 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -319,6 +319,7 @@ public class JobResultsPersisterTests extends ESTestCase { BulkRequest bulkRequest = bulkRequestCaptor.getValue(); assertThat(bulkRequest.requests().size(), equalTo(1)); IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0); + assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias))); assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias)); assertThat(indexRequest.id(), equalTo("foo_quantiles")); @@ -359,6 +360,7 @@ public class JobResultsPersisterTests extends ESTestCase { assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias)); assertThat(indexRequest.id(), equalTo("foo_quantiles")); + assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias))); } public void testPersistQuantilesAsync_QuantilesDocumentCreated() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java index e91fb28baf7..8b8dde42ac7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.process; import com.carrotsearch.randomizedtesting.annotations.Timeout; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchRequest; @@ -104,6 +105,13 @@ public class IndexingStateProcessorTests extends ESTestCase { assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString()); verify(resultsPersisterService, times(3)).searchWithRetry(any(SearchRequest.class), any(), any(), any()); verify(resultsPersisterService, times(3)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any()); + ArgumentCaptor bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class); + verify(resultsPersisterService, times(3)).bulkIndexWithRetry(bulkRequestArgumentCaptor.capture(), any(), any(), any()); + for (BulkRequest bulkRequest : bulkRequestArgumentCaptor.getAllValues()) { + for (DocWriteRequest request : bulkRequest.requests()) { + assertThat(request.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias))); + } + } } public void testStateRead_StateDocumentCreated() throws IOException { @@ -184,5 +192,12 @@ public class IndexingStateProcessorTests extends ESTestCase { verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq(".ml-state-write"), any()); verify(resultsPersisterService, times(NUM_LARGE_DOCS)).searchWithRetry(any(SearchRequest.class), any(), any(), any()); verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any()); + ArgumentCaptor bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class); + verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(bulkRequestArgumentCaptor.capture(), any(), any(), any()); + for (BulkRequest bulkRequest : bulkRequestArgumentCaptor.getAllValues()) { + for (DocWriteRequest request : bulkRequest.requests()) { + assertTrue(request.isRequireAlias()); + } + } } }