diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index d4ff312d8cf..4024e59bb0e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -118,6 +118,11 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques /** * Add a request to the current BulkRequest. + * + * Note for internal callers: This method does not respect all global parameters. + * Only the global index is applied to the request objects. + * Global parameters would be respected if the request was serialized for a REST call as it is + * in the high level rest client. * @param request Request to add * @return the current bulk request */ @@ -350,11 +355,35 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return this; } + /** + * Note for internal callers (NOT high level rest client), + * the global parameter setting is ignored when used with: + * + * - {@link BulkRequest#add(IndexRequest)} + * - {@link BulkRequest#add(UpdateRequest)} + * - {@link BulkRequest#add(DocWriteRequest)} + * - {@link BulkRequest#add(DocWriteRequest[])} )} + * - {@link BulkRequest#add(Iterable)} + * @param globalPipeline the global default setting + * @return Bulk request with global setting set + */ public final BulkRequest pipeline(String globalPipeline) { this.globalPipeline = globalPipeline; return this; } + /** + * Note for internal callers (NOT high level rest client), + * the global parameter setting is ignored when used with: + * + - {@link BulkRequest#add(IndexRequest)} + - {@link BulkRequest#add(UpdateRequest)} + - {@link BulkRequest#add(DocWriteRequest)} + - {@link BulkRequest#add(DocWriteRequest[])} )} + - {@link BulkRequest#add(Iterable)} + * @param globalRouting the global default setting + * @return Bulk request with global setting set + */ public final BulkRequest routing(String globalRouting){ this.globalRouting = globalRouting; return this; @@ -382,6 +411,18 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques return globalRequireAlias; } + /** + * Note for internal callers (NOT high level rest client), + * the global parameter setting is ignored when used with: + * + * - {@link BulkRequest#add(IndexRequest)} + * - {@link BulkRequest#add(UpdateRequest)} + * - {@link BulkRequest#add(DocWriteRequest)} + * - {@link BulkRequest#add(DocWriteRequest[])} )} + * - {@link BulkRequest#add(Iterable)} + * @param globalRequireAlias the global default setting + * @return Bulk request with global setting set + */ public BulkRequest requireAlias(Boolean globalRequireAlias) { this.globalRequireAlias = globalRequireAlias; return this; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 2024c5d2938..00f68246e4e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ml.utils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -88,16 +89,28 @@ public final class MlIndexAndAlias { String alias, ActionListener finalListener) { + final ActionListener loggingListener = ActionListener.wrap( + finalListener::onResponse, + e -> { + logger.error(new ParameterizedMessage( + "Failed to create alias and index with pattern [{}] and alias [{}]", + indexPatternPrefix, + alias), + e); + finalListener.onFailure(e); + } + ); + // If both the index and alias were successfully created then wait for the shards of the index that the alias points to be ready ActionListener indexCreatedListener = ActionListener.wrap( created -> { if (created) { - waitForShardsReady(client, alias, finalListener); + waitForShardsReady(client, alias, loggingListener); } else { - finalListener.onResponse(false); + loggingListener.onResponse(false); } }, - finalListener::onFailure + loggingListener::onFailure ); boolean isHiddenAttributeAvailable = clusterState.nodes().getMinNodeVersion().onOrAfter(HIDDEN_INTRODUCED_VERSION); @@ -135,7 +148,7 @@ public final class MlIndexAndAlias { ActionListener.wrap( unused -> updateWriteAlias( client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, isHiddenAttributeAvailable, indexCreatedListener), - finalListener::onFailure) + loggingListener::onFailure) ); return; } @@ -146,12 +159,12 @@ public final class MlIndexAndAlias { if (indexPointedByCurrentWriteAlias.isPresent() == false) { assert concreteIndexNames.length > 0; String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get(); - updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, finalListener); + updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, loggingListener); return; } } // If the alias is set, there is nothing more to do. - finalListener.onResponse(false); + loggingListener.onResponse(false); } private static void waitForShardsReady(Client client, String index, ActionListener listener) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java index d7f276450b4..8ba21efcee2 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java @@ -16,7 +16,10 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -36,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.ArrayList; @@ -43,6 +47,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -51,6 +56,21 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { private static final long MS_IN_DAY = TimeValue.timeValueDays(1).millis(); + /** + * In production the only way to create a model snapshot is to open a job, and + * opening a job ensures that the state index exists. This suite does not open jobs + * but instead inserts snapshot and state documents directly to the results and + * state indices. This means it needs to create the state index explicitly. This + * method should not be copied to test suites that run jobs in the way they are + * run in production. + */ + @Before + public void addMlState() { + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future); + future.actionGet(); + } + @After public void cleanUpTest() { cleanUp(); @@ -220,12 +240,13 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) { assertThat(numDocs, greaterThan(0)); - BulkRequest bulkRequest = new BulkRequest().requireAlias(true); + BulkRequest bulkRequest = new BulkRequest(); for (int i = 1; i <= numDocs; ++i) { IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) .id(ModelState.documentId(jobId, snapshotId, i)) // The exact contents of the model state doesn't matter - we are not going to try and restore it - .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))); + .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))) + .setRequireAlias(true); bulkRequest.add(indexRequest); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 8a70b3928c4..bfddfca6cbe 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -86,7 +86,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { return new NamedXContentRegistry(entries); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/59413") public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws Exception { initialize("regression_single_numeric_feature_and_mixed_data_set"); String predictedClassField = DEPENDENT_VARIABLE_FIELD + "_prediction"; diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 1401dd8b986..ab2bef15b2c 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -9,8 +9,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -94,6 +96,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.stream.Collectors.toList; import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -168,6 +171,13 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase { }; waitForMlTemplates(); putJob(); + // In production opening a job ensures the state index exists. These tests + // do not open jobs, but instead feed JSON directly to the results processor. + // A a result they must create the index as part of the test setup. Do not + // copy this setup to tests that run jobs in the way they are run in production. + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future); + future.get(); } @After diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java index 0d46a66e187..4c6ad04426e 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -88,6 +89,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -917,6 +919,9 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase { } private void indexQuantiles(Quantiles quantiles) { + PlainActionFuture future = new PlainActionFuture<>(); + createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future); + future.actionGet(); JobResultsPersister persister = new JobResultsPersister(new OriginSettingClient(client(), ClientHelper.ML_ORIGIN), resultsPersisterService, auditor); persister.persistQuantiles(quantiles, () -> true); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java index ff9add3cd1f..dffd36e9488 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java @@ -74,7 +74,7 @@ public class AnnotationPersister { public class Builder { private final String jobId; - private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true); + private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); private Supplier shouldRetry = () -> true; private Builder(String jobId) { @@ -93,7 +93,7 @@ public class AnnotationPersister { public Builder persistAnnotation(@Nullable String annotationId, Annotation annotation) { Objects.requireNonNull(annotation); try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { - bulkRequest.add(new IndexRequest().id(annotationId).source(xContentBuilder)); + bulkRequest.add(new IndexRequest().id(annotationId).source(xContentBuilder).setRequireAlias(true)); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error serialising annotation", jobId), e); } @@ -115,7 +115,7 @@ public class AnnotationPersister { BulkResponse bulkResponse = resultsPersisterService.bulkIndexWithRetry( bulkRequest, jobId, shouldRetry, msg -> auditor.warning(jobId, "Bulk indexing of annotations failed " + msg)); - bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true); + bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); return bulkResponse; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java index fe0ba993f51..d5a58e62fc7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java @@ -188,7 +188,7 @@ public class TrainedModelStatsService { if (stats.isEmpty()) { return; } - BulkRequest bulkRequest = new BulkRequest().requireAlias(true); + BulkRequest bulkRequest = new BulkRequest(); stats.stream().map(TrainedModelStatsService::buildUpdateRequest).filter(Objects::nonNull).forEach(bulkRequest::add); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); if (bulkRequest.requests().isEmpty()) { @@ -254,7 +254,8 @@ public class TrainedModelStatsService { // out of band. If there is MANY more than that, something strange is happening and it should fail. .retryOnConflict(3) .id(InferenceStats.docId(stats.getModelId(), stats.getNodeId())) - .script(new Script(ScriptType.INLINE, "painless", STATS_UPDATE_SCRIPT, params)); + .script(new Script(ScriptType.INLINE, "painless", STATS_UPDATE_SCRIPT, params)) + .setRequireAlias(true); return updateRequest; } catch (IOException ex) { logger.error( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java index 7f4497addc7..2af0709378a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java @@ -49,7 +49,7 @@ import java.util.Objects; * 2. Document id is extracted from this line. * 3. Document with this id is searched for in .ml-state* indices * 4. If the document is found, it is overwritten in place (i.e. in the same index) with the new content. - * Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-writei + * Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-write */ public class IndexingStateProcessor implements StateProcessor { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java index 98c9bc243f0..0588adbf923 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java @@ -112,9 +112,9 @@ public class ResultsPersisterService { boolean requireAlias, Supplier shouldRetry, Consumer msgHandler) throws IOException { - BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy).requireAlias(requireAlias); + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy); try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params)) { - bulkRequest.add(new IndexRequest(indexName).id(id).source(content)); + bulkRequest.add(new IndexRequest(indexName).id(id).source(content).setRequireAlias(requireAlias)); } return bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, msgHandler); } @@ -288,7 +288,6 @@ public class ResultsPersisterService { private BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest, BulkResponse bulkResponse) { // If we failed, lets set the bulkRequest to be a collection of the failed requests BulkRequest bulkRequestOfFailures = new BulkRequest(); - bulkRequestOfFailures.requireAlias(bulkRequest.requireAlias()); Set failedDocIds = Arrays.stream(bulkResponse.getItems()) .filter(BulkItemResponse::isFailed) .map(BulkItemResponse::getId) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java index 3d97bfc8237..aac55f5c134 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java @@ -178,6 +178,7 @@ public class DataFrameAnalyticsTaskTests extends ESTestCase { IndexRequest indexRequest = indexRequestCaptor.getValue(); assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias)); + assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias))); assertThat(indexRequest.id(), equalTo("data_frame_analytics-task_id-progress")); try (XContentParser parser = JsonXContent.jsonXContent.createParser( diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index a5fbd5f8eb2..46f706f7994 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -319,6 +319,7 @@ public class JobResultsPersisterTests extends ESTestCase { BulkRequest bulkRequest = bulkRequestCaptor.getValue(); assertThat(bulkRequest.requests().size(), equalTo(1)); IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0); + assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias))); assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias)); assertThat(indexRequest.id(), equalTo("foo_quantiles")); @@ -359,6 +360,7 @@ public class JobResultsPersisterTests extends ESTestCase { assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias)); assertThat(indexRequest.id(), equalTo("foo_quantiles")); + assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias))); } public void testPersistQuantilesAsync_QuantilesDocumentCreated() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java index e91fb28baf7..8b8dde42ac7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.process; import com.carrotsearch.randomizedtesting.annotations.Timeout; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchRequest; @@ -104,6 +105,13 @@ public class IndexingStateProcessorTests extends ESTestCase { assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString()); verify(resultsPersisterService, times(3)).searchWithRetry(any(SearchRequest.class), any(), any(), any()); verify(resultsPersisterService, times(3)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any()); + ArgumentCaptor bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class); + verify(resultsPersisterService, times(3)).bulkIndexWithRetry(bulkRequestArgumentCaptor.capture(), any(), any(), any()); + for (BulkRequest bulkRequest : bulkRequestArgumentCaptor.getAllValues()) { + for (DocWriteRequest request : bulkRequest.requests()) { + assertThat(request.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias))); + } + } } public void testStateRead_StateDocumentCreated() throws IOException { @@ -184,5 +192,12 @@ public class IndexingStateProcessorTests extends ESTestCase { verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq(".ml-state-write"), any()); verify(resultsPersisterService, times(NUM_LARGE_DOCS)).searchWithRetry(any(SearchRequest.class), any(), any(), any()); verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any()); + ArgumentCaptor bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class); + verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(bulkRequestArgumentCaptor.capture(), any(), any(), any()); + for (BulkRequest bulkRequest : bulkRequestArgumentCaptor.getAllValues()) { + for (DocWriteRequest request : bulkRequest.requests()) { + assertTrue(request.isRequireAlias()); + } + } } }