[7.x] [ML] changing to not use global bulk indexing parameters in conjunction with add(object) calls (#62694) (#62784)
* [ML] changing to not use global bulk indexing parameters in conjunction with add(object) calls (#62694) * [ML] changing to not use global bulk indexing parameters in conjunction with add(object) calls global parameters, outside of the global index, are ignored for internal callers in certain cases. If the interal caller is adding requests via the following methods: ``` - BulkRequest#add(IndexRequest) - BulkRequest#add(UpdateRequest) - BulkRequest#add(DocWriteRequest) - BulkRequest#add(DocWriteRequest[]) ``` It is better to specifically set the desired parameters on the requests before they are added to the bulk request object. This commit addresses this issue for the ML plugin * unmuting test
This commit is contained in:
parent
1e72144847
commit
77bfb32635
|
@ -118,6 +118,11 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
|
|||
|
||||
/**
|
||||
* Add a request to the current BulkRequest.
|
||||
*
|
||||
* Note for internal callers: This method does not respect all global parameters.
|
||||
* Only the global index is applied to the request objects.
|
||||
* Global parameters would be respected if the request was serialized for a REST call as it is
|
||||
* in the high level rest client.
|
||||
* @param request Request to add
|
||||
* @return the current bulk request
|
||||
*/
|
||||
|
@ -350,11 +355,35 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Note for internal callers (NOT high level rest client),
|
||||
* the global parameter setting is ignored when used with:
|
||||
*
|
||||
* - {@link BulkRequest#add(IndexRequest)}
|
||||
* - {@link BulkRequest#add(UpdateRequest)}
|
||||
* - {@link BulkRequest#add(DocWriteRequest)}
|
||||
* - {@link BulkRequest#add(DocWriteRequest[])} )}
|
||||
* - {@link BulkRequest#add(Iterable)}
|
||||
* @param globalPipeline the global default setting
|
||||
* @return Bulk request with global setting set
|
||||
*/
|
||||
public final BulkRequest pipeline(String globalPipeline) {
|
||||
this.globalPipeline = globalPipeline;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Note for internal callers (NOT high level rest client),
|
||||
* the global parameter setting is ignored when used with:
|
||||
*
|
||||
- {@link BulkRequest#add(IndexRequest)}
|
||||
- {@link BulkRequest#add(UpdateRequest)}
|
||||
- {@link BulkRequest#add(DocWriteRequest)}
|
||||
- {@link BulkRequest#add(DocWriteRequest[])} )}
|
||||
- {@link BulkRequest#add(Iterable)}
|
||||
* @param globalRouting the global default setting
|
||||
* @return Bulk request with global setting set
|
||||
*/
|
||||
public final BulkRequest routing(String globalRouting){
|
||||
this.globalRouting = globalRouting;
|
||||
return this;
|
||||
|
@ -382,6 +411,18 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
|
|||
return globalRequireAlias;
|
||||
}
|
||||
|
||||
/**
|
||||
* Note for internal callers (NOT high level rest client),
|
||||
* the global parameter setting is ignored when used with:
|
||||
*
|
||||
* - {@link BulkRequest#add(IndexRequest)}
|
||||
* - {@link BulkRequest#add(UpdateRequest)}
|
||||
* - {@link BulkRequest#add(DocWriteRequest)}
|
||||
* - {@link BulkRequest#add(DocWriteRequest[])} )}
|
||||
* - {@link BulkRequest#add(Iterable)}
|
||||
* @param globalRequireAlias the global default setting
|
||||
* @return Bulk request with global setting set
|
||||
*/
|
||||
public BulkRequest requireAlias(Boolean globalRequireAlias) {
|
||||
this.globalRequireAlias = globalRequireAlias;
|
||||
return this;
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ml.utils;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -88,16 +89,28 @@ public final class MlIndexAndAlias {
|
|||
String alias,
|
||||
ActionListener<Boolean> finalListener) {
|
||||
|
||||
final ActionListener<Boolean> loggingListener = ActionListener.wrap(
|
||||
finalListener::onResponse,
|
||||
e -> {
|
||||
logger.error(new ParameterizedMessage(
|
||||
"Failed to create alias and index with pattern [{}] and alias [{}]",
|
||||
indexPatternPrefix,
|
||||
alias),
|
||||
e);
|
||||
finalListener.onFailure(e);
|
||||
}
|
||||
);
|
||||
|
||||
// If both the index and alias were successfully created then wait for the shards of the index that the alias points to be ready
|
||||
ActionListener<Boolean> indexCreatedListener = ActionListener.wrap(
|
||||
created -> {
|
||||
if (created) {
|
||||
waitForShardsReady(client, alias, finalListener);
|
||||
waitForShardsReady(client, alias, loggingListener);
|
||||
} else {
|
||||
finalListener.onResponse(false);
|
||||
loggingListener.onResponse(false);
|
||||
}
|
||||
},
|
||||
finalListener::onFailure
|
||||
loggingListener::onFailure
|
||||
);
|
||||
|
||||
boolean isHiddenAttributeAvailable = clusterState.nodes().getMinNodeVersion().onOrAfter(HIDDEN_INTRODUCED_VERSION);
|
||||
|
@ -135,7 +148,7 @@ public final class MlIndexAndAlias {
|
|||
ActionListener.wrap(
|
||||
unused -> updateWriteAlias(
|
||||
client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, isHiddenAttributeAvailable, indexCreatedListener),
|
||||
finalListener::onFailure)
|
||||
loggingListener::onFailure)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
@ -146,12 +159,12 @@ public final class MlIndexAndAlias {
|
|||
if (indexPointedByCurrentWriteAlias.isPresent() == false) {
|
||||
assert concreteIndexNames.length > 0;
|
||||
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
|
||||
updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, finalListener);
|
||||
updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, loggingListener);
|
||||
return;
|
||||
}
|
||||
}
|
||||
// If the alias is set, there is nothing more to do.
|
||||
finalListener.onResponse(false);
|
||||
loggingListener.onResponse(false);
|
||||
}
|
||||
|
||||
private static void waitForShardsReady(Client client, String index, ActionListener<Boolean> listener) {
|
||||
|
|
|
@ -16,7 +16,10 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -36,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
|||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -43,6 +47,7 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -51,6 +56,21 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase {
|
|||
|
||||
private static final long MS_IN_DAY = TimeValue.timeValueDays(1).millis();
|
||||
|
||||
/**
|
||||
* In production the only way to create a model snapshot is to open a job, and
|
||||
* opening a job ensures that the state index exists. This suite does not open jobs
|
||||
* but instead inserts snapshot and state documents directly to the results and
|
||||
* state indices. This means it needs to create the state index explicitly. This
|
||||
* method should not be copied to test suites that run jobs in the way they are
|
||||
* run in production.
|
||||
*/
|
||||
@Before
|
||||
public void addMlState() {
|
||||
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
|
||||
createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future);
|
||||
future.actionGet();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUpTest() {
|
||||
cleanUp();
|
||||
|
@ -220,12 +240,13 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase {
|
|||
private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) {
|
||||
assertThat(numDocs, greaterThan(0));
|
||||
|
||||
BulkRequest bulkRequest = new BulkRequest().requireAlias(true);
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
for (int i = 1; i <= numDocs; ++i) {
|
||||
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
|
||||
.id(ModelState.documentId(jobId, snapshotId, i))
|
||||
// The exact contents of the model state doesn't matter - we are not going to try and restore it
|
||||
.source(Collections.singletonMap("compressed", Collections.singletonList("foo")));
|
||||
.source(Collections.singletonMap("compressed", Collections.singletonList("foo")))
|
||||
.setRequireAlias(true);
|
||||
bulkRequest.add(indexRequest);
|
||||
}
|
||||
|
||||
|
|
|
@ -86,7 +86,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
|||
return new NamedXContentRegistry(entries);
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/59413")
|
||||
public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws Exception {
|
||||
initialize("regression_single_numeric_feature_and_mixed_data_set");
|
||||
String predictedClassField = DEPENDENT_VARIABLE_FIELD + "_prediction";
|
||||
|
|
|
@ -9,8 +9,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.OperationRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
|
@ -94,6 +96,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
|
||||
import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary;
|
||||
import static org.hamcrest.Matchers.closeTo;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
|
@ -168,6 +171,13 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
|
|||
};
|
||||
waitForMlTemplates();
|
||||
putJob();
|
||||
// In production opening a job ensures the state index exists. These tests
|
||||
// do not open jobs, but instead feed JSON directly to the results processor.
|
||||
// A a result they must create the index as part of the test setup. Do not
|
||||
// copy this setup to tests that run jobs in the way they are run in production.
|
||||
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
|
||||
createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future);
|
||||
future.get();
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.OriginSettingClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetadata;
|
||||
|
@ -88,6 +89,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -917,6 +919,9 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
|
|||
}
|
||||
|
||||
private void indexQuantiles(Quantiles quantiles) {
|
||||
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
|
||||
createStateIndexAndAliasIfNecessary(client(), ClusterState.EMPTY_STATE, new IndexNameExpressionResolver(), future);
|
||||
future.actionGet();
|
||||
JobResultsPersister persister =
|
||||
new JobResultsPersister(new OriginSettingClient(client(), ClientHelper.ML_ORIGIN), resultsPersisterService, auditor);
|
||||
persister.persistQuantiles(quantiles, () -> true);
|
||||
|
|
|
@ -74,7 +74,7 @@ public class AnnotationPersister {
|
|||
public class Builder {
|
||||
|
||||
private final String jobId;
|
||||
private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true);
|
||||
private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
private Supplier<Boolean> shouldRetry = () -> true;
|
||||
|
||||
private Builder(String jobId) {
|
||||
|
@ -93,7 +93,7 @@ public class AnnotationPersister {
|
|||
public Builder persistAnnotation(@Nullable String annotationId, Annotation annotation) {
|
||||
Objects.requireNonNull(annotation);
|
||||
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
|
||||
bulkRequest.add(new IndexRequest().id(annotationId).source(xContentBuilder));
|
||||
bulkRequest.add(new IndexRequest().id(annotationId).source(xContentBuilder).setRequireAlias(true));
|
||||
} catch (IOException e) {
|
||||
logger.error(new ParameterizedMessage("[{}] Error serialising annotation", jobId), e);
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ public class AnnotationPersister {
|
|||
BulkResponse bulkResponse =
|
||||
resultsPersisterService.bulkIndexWithRetry(
|
||||
bulkRequest, jobId, shouldRetry, msg -> auditor.warning(jobId, "Bulk indexing of annotations failed " + msg));
|
||||
bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true);
|
||||
bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME);
|
||||
return bulkResponse;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -188,7 +188,7 @@ public class TrainedModelStatsService {
|
|||
if (stats.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
BulkRequest bulkRequest = new BulkRequest().requireAlias(true);
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
stats.stream().map(TrainedModelStatsService::buildUpdateRequest).filter(Objects::nonNull).forEach(bulkRequest::add);
|
||||
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
if (bulkRequest.requests().isEmpty()) {
|
||||
|
@ -254,7 +254,8 @@ public class TrainedModelStatsService {
|
|||
// out of band. If there is MANY more than that, something strange is happening and it should fail.
|
||||
.retryOnConflict(3)
|
||||
.id(InferenceStats.docId(stats.getModelId(), stats.getNodeId()))
|
||||
.script(new Script(ScriptType.INLINE, "painless", STATS_UPDATE_SCRIPT, params));
|
||||
.script(new Script(ScriptType.INLINE, "painless", STATS_UPDATE_SCRIPT, params))
|
||||
.setRequireAlias(true);
|
||||
return updateRequest;
|
||||
} catch (IOException ex) {
|
||||
logger.error(
|
||||
|
|
|
@ -49,7 +49,7 @@ import java.util.Objects;
|
|||
* 2. Document id is extracted from this line.
|
||||
* 3. Document with this id is searched for in .ml-state* indices
|
||||
* 4. If the document is found, it is overwritten in place (i.e. in the same index) with the new content.
|
||||
* Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-writei
|
||||
* Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-write
|
||||
*/
|
||||
public class IndexingStateProcessor implements StateProcessor {
|
||||
|
||||
|
|
|
@ -112,9 +112,9 @@ public class ResultsPersisterService {
|
|||
boolean requireAlias,
|
||||
Supplier<Boolean> shouldRetry,
|
||||
Consumer<String> msgHandler) throws IOException {
|
||||
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy).requireAlias(requireAlias);
|
||||
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy);
|
||||
try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params)) {
|
||||
bulkRequest.add(new IndexRequest(indexName).id(id).source(content));
|
||||
bulkRequest.add(new IndexRequest(indexName).id(id).source(content).setRequireAlias(requireAlias));
|
||||
}
|
||||
return bulkIndexWithRetry(bulkRequest, jobId, shouldRetry, msgHandler);
|
||||
}
|
||||
|
@ -288,7 +288,6 @@ public class ResultsPersisterService {
|
|||
private BulkRequest buildNewRequestFromFailures(BulkRequest bulkRequest, BulkResponse bulkResponse) {
|
||||
// If we failed, lets set the bulkRequest to be a collection of the failed requests
|
||||
BulkRequest bulkRequestOfFailures = new BulkRequest();
|
||||
bulkRequestOfFailures.requireAlias(bulkRequest.requireAlias());
|
||||
Set<String> failedDocIds = Arrays.stream(bulkResponse.getItems())
|
||||
.filter(BulkItemResponse::isFailed)
|
||||
.map(BulkItemResponse::getId)
|
||||
|
|
|
@ -178,6 +178,7 @@ public class DataFrameAnalyticsTaskTests extends ESTestCase {
|
|||
|
||||
IndexRequest indexRequest = indexRequestCaptor.getValue();
|
||||
assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias));
|
||||
assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias)));
|
||||
assertThat(indexRequest.id(), equalTo("data_frame_analytics-task_id-progress"));
|
||||
|
||||
try (XContentParser parser = JsonXContent.jsonXContent.createParser(
|
||||
|
|
|
@ -319,6 +319,7 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
BulkRequest bulkRequest = bulkRequestCaptor.getValue();
|
||||
assertThat(bulkRequest.requests().size(), equalTo(1));
|
||||
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0);
|
||||
assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias)));
|
||||
|
||||
assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias));
|
||||
assertThat(indexRequest.id(), equalTo("foo_quantiles"));
|
||||
|
@ -359,6 +360,7 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
|
||||
assertThat(indexRequest.index(), equalTo(expectedIndexOrAlias));
|
||||
assertThat(indexRequest.id(), equalTo("foo_quantiles"));
|
||||
assertThat(indexRequest.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias)));
|
||||
}
|
||||
|
||||
public void testPersistQuantilesAsync_QuantilesDocumentCreated() {
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.ml.process;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Timeout;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
|
@ -104,6 +105,13 @@ public class IndexingStateProcessorTests extends ESTestCase {
|
|||
assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString());
|
||||
verify(resultsPersisterService, times(3)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
|
||||
verify(resultsPersisterService, times(3)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
|
||||
ArgumentCaptor<BulkRequest> bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class);
|
||||
verify(resultsPersisterService, times(3)).bulkIndexWithRetry(bulkRequestArgumentCaptor.capture(), any(), any(), any());
|
||||
for (BulkRequest bulkRequest : bulkRequestArgumentCaptor.getAllValues()) {
|
||||
for (DocWriteRequest<?> request : bulkRequest.requests()) {
|
||||
assertThat(request.isRequireAlias(), equalTo(".ml-state-write".equals(expectedIndexOrAlias)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testStateRead_StateDocumentCreated() throws IOException {
|
||||
|
@ -184,5 +192,12 @@ public class IndexingStateProcessorTests extends ESTestCase {
|
|||
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq(".ml-state-write"), any());
|
||||
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
|
||||
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
|
||||
ArgumentCaptor<BulkRequest> bulkRequestArgumentCaptor = ArgumentCaptor.forClass(BulkRequest.class);
|
||||
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(bulkRequestArgumentCaptor.capture(), any(), any(), any());
|
||||
for (BulkRequest bulkRequest : bulkRequestArgumentCaptor.getAllValues()) {
|
||||
for (DocWriteRequest<?> request : bulkRequest.requests()) {
|
||||
assertTrue(request.isRequireAlias());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue