diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java index a2d09ecaa31..a395845ccbd 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -110,6 +110,7 @@ public class DeleteJobIT extends MlNativeAutodetectIntegTestCase { try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME) .source(xContentBuilder) + .setRequireAlias(true) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java index bc3afbc3381..d7f276450b4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java @@ -203,8 +203,9 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { ModelSnapshot.Builder modelSnapshotBuilder = new ModelSnapshot.Builder(); modelSnapshotBuilder.setJobId(jobId).setSnapshotId(snapshotId).setTimestamp(timestamp).setSnapshotDocCount(numDocs); - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)); - indexRequest.id(ModelSnapshot.documentId(jobId, snapshotId)); + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)) + .id(ModelSnapshot.documentId(jobId, snapshotId)) + .setRequireAlias(true); if (immediateRefresh) { indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } @@ -219,12 +220,12 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) { assertThat(numDocs, greaterThan(0)); - BulkRequest bulkRequest = new BulkRequest(); + BulkRequest bulkRequest = new BulkRequest().requireAlias(true); for (int i = 1; i <= numDocs; ++i) { - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()); - indexRequest.id(ModelState.documentId(jobId, snapshotId, i)); - // The exact contents of the model state doesn't matter - we are not going to try and restore it - indexRequest.source(Collections.singletonMap("compressed", Collections.singletonList("foo"))); + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .id(ModelState.documentId(jobId, snapshotId, i)) + // The exact contents of the model state doesn't matter - we are not going to try and restore it + .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))); bulkRequest.add(indexRequest); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java index 8c769e1cacb..ce84789db53 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java @@ -216,6 +216,7 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME) .source(xContentBuilder) + .setRequireAlias(true) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index eba19a239d4..5ee08b0ce4c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -451,6 +451,7 @@ public class MlConfigMigrator { String documentId = "ml-config"; IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) .id(documentId) + .setRequireAlias(true) .opType(DocWriteRequest.OpType.CREATE); ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java index 3744e3e1a5b..ff9add3cd1f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java @@ -74,7 +74,7 @@ public class AnnotationPersister { public class Builder { private final String jobId; - private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); + private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true); private Supplier shouldRetry = () -> true; private Builder(String jobId) { @@ -115,7 +115,7 @@ public class AnnotationPersister { BulkResponse bulkResponse = resultsPersisterService.bulkIndexWithRetry( bulkRequest, jobId, shouldRetry, msg -> auditor.warning(jobId, "Bulk indexing of annotations failed " + msg)); - bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); + bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true); return bulkResponse; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index 5e2f54c1667..d046fbc137d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -299,6 +299,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S } IndexRequest indexRequest = new IndexRequest(indexOrAlias) .id(progressDocId) + .setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) { new StoredProgress(stats.get().getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java index 4d8179e9fcc..0599482cfd9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java @@ -44,6 +44,7 @@ public class StatsPersister { new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")), WriteRequest.RefreshPolicy.NONE, docIdSupplier.apply(jobId), + true, () -> true, errorMsg -> auditor.error(jobId, "failed to persist result with id [" + docIdSupplier.apply(jobId) + "]; " + errorMsg) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java index 8fb57b5a842..5a434d0cc3c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java @@ -174,7 +174,7 @@ public class TrainedModelStatsService { if (stats.isEmpty()) { return; } - BulkRequest bulkRequest = new BulkRequest(); + BulkRequest bulkRequest = new BulkRequest().requireAlias(true); stats.stream().map(TrainedModelStatsService::buildUpdateRequest).filter(Objects::nonNull).forEach(bulkRequest::add); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); if (bulkRequest.requests().isEmpty()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java index 6769e4d790d..ee8ba3b1c10 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java @@ -65,6 +65,7 @@ public class JobDataCountsPersister { ToXContent.EMPTY_PARAMS, WriteRequest.RefreshPolicy.NONE, DataCounts.documentId(jobId), + true, () -> true, (msg) -> auditor.warning(jobId, "Job data_counts " + msg)); } catch (IOException ioe) { @@ -88,6 +89,7 @@ public class JobDataCountsPersister { try (XContentBuilder content = serialiseCounts(counts)) { final IndexRequest request = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)) .id(DataCounts.documentId(jobId)) + .setRequireAlias(true) .source(content); executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener() { @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 43a16b400a8..d50308da865 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -266,7 +266,7 @@ public class JobResultsPersister { public void persistCategoryDefinition(CategoryDefinition category, Supplier shouldRetry) { Persistable persistable = new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId()), category.getJobId(), category, category.getId()); - persistable.persist(shouldRetry); + persistable.persist(shouldRetry, true); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -290,7 +290,7 @@ public class JobResultsPersister { : AnomalyDetectorsIndex.jobStateIndexWriteAlias(); Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId); - persistable.persist(shouldRetry); + persistable.persist(shouldRetry, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); } /** @@ -311,7 +311,7 @@ public class JobResultsPersister { Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(listener); + persistable.persist(listener, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); }, listener::onFailure ); @@ -345,7 +345,7 @@ public class JobResultsPersister { modelSnapshot, ModelSnapshot.documentId(modelSnapshot)); persistable.setRefreshPolicy(refreshPolicy); - return persistable.persist(shouldRetry); + return persistable.persist(shouldRetry, true); } /** @@ -356,7 +356,7 @@ public class JobResultsPersister { logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(jobId), jobId, modelSizeStats, modelSizeStats.getId()); - persistable.persist(shouldRetry); + persistable.persist(shouldRetry, true); } /** @@ -369,7 +369,7 @@ public class JobResultsPersister { Persistable persistable = new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(jobId), jobId, modelSizeStats, modelSizeStats.getId()); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(listener); + persistable.persist(listener, true); } /** @@ -448,7 +448,7 @@ public class JobResultsPersister { new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")), DatafeedTimingStats.documentId(timingStats.getJobId())); persistable.setRefreshPolicy(refreshPolicy); - return persistable.persist(() -> true); + return persistable.persist(() -> true, true); } private static XContentBuilder toXContentBuilder(ToXContent obj, ToXContent.Params params) throws IOException { @@ -483,7 +483,7 @@ public class JobResultsPersister { this.refreshPolicy = refreshPolicy; } - BulkResponse persist(Supplier shouldRetry) { + BulkResponse persist(Supplier shouldRetry, boolean requireAlias) { logCall(indexName); try { return resultsPersisterService.indexWithRetry(jobId, @@ -492,6 +492,7 @@ public class JobResultsPersister { params, refreshPolicy, id, + requireAlias, shouldRetry, (msg) -> auditor.warning(jobId, id + " " + msg)); } catch (IOException e) { @@ -504,11 +505,15 @@ public class JobResultsPersister { } } - void persist(ActionListener listener) { + void persist(ActionListener listener, boolean requireAlias) { logCall(indexName); try (XContentBuilder content = toXContentBuilder(object, params)) { - IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(content).setRefreshPolicy(refreshPolicy); + IndexRequest indexRequest = new IndexRequest(indexName) + .id(id) + .source(content) + .setRefreshPolicy(refreshPolicy) + .setRequireAlias(requireAlias); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest, listener, client::index); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error writing [{}]", jobId, (id == null) ? "auto-generated ID" : id), e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java index 2b45aa9bf91..7f4497addc7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java @@ -137,7 +137,9 @@ public class IndexingStateProcessor implements StateProcessor { } void persist(String indexOrAlias, BytesReference bytes) throws IOException { - BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + BulkRequest bulkRequest = new BulkRequest() + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .requireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); bulkRequest.add(bytes, indexOrAlias, XContentType.JSON); if (bulkRequest.numberOfActions() > 0) { LOGGER.trace("[{}] Persisting job state document: index [{}], length [{}]", jobId, indexOrAlias, bytes.length()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java index 3adab97b7f5..3c749acb793 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -44,6 +45,8 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.ExceptionsHelper.status; + public class ResultsPersisterService { /** * List of rest statuses that we consider irrecoverable @@ -52,6 +55,7 @@ public class ResultsPersisterService { Arrays.asList( RestStatus.GONE, RestStatus.NOT_IMPLEMENTED, + // Not found is returned when we require an alias but the index is NOT an alias. RestStatus.NOT_FOUND, RestStatus.BAD_REQUEST, RestStatus.UNAUTHORIZED, @@ -105,9 +109,10 @@ public class ResultsPersisterService { ToXContent.Params params, WriteRequest.RefreshPolicy refreshPolicy, String id, + boolean requireAlias, Supplier shouldRetry, Consumer msgHandler) throws IOException { - BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy); + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy).requireAlias(requireAlias); try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params)) { bulkRequest.add(new IndexRequest(indexName).id(id).source(content)); } @@ -144,19 +149,18 @@ public class ResultsPersisterService { return bulkResponse; } for (BulkItemResponse itemResponse : bulkResponse.getItems()) { - if (itemResponse.isFailed()) { - if (isIrrecoverable(itemResponse.getFailure().getCause())) { - Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause()); - LOGGER.warn(new ParameterizedMessage( - "[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]", - jobId, - bulkResponse.buildFailureMessage()), - unwrappedParticular); - throw new ElasticsearchException( - "{} experienced failure that cannot be automatically retried. See logs for bulk failures", - unwrappedParticular, - jobId); - } + if (itemResponse.isFailed() && isIrrecoverable(itemResponse.getFailure().getCause())) { + Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause()); + LOGGER.warn(new ParameterizedMessage( + "[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]", + jobId, + bulkResponse.buildFailureMessage()), + unwrappedParticular); + throw new ElasticsearchStatusException( + "{} experienced failure that cannot be automatically retried. See logs for bulk failures", + status(unwrappedParticular), + unwrappedParticular, + jobId); } } retryContext.nextIteration("index", bulkResponse.buildFailureMessage()); @@ -183,7 +187,11 @@ public class ResultsPersisterService { failureMessage = e.getDetailedMessage(); if (isIrrecoverable(e)) { LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e); - throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId); + throw new ElasticsearchStatusException( + "{} experienced failure that cannot be automatically retried", + status(e), + e, + jobId); } } @@ -197,10 +205,7 @@ public class ResultsPersisterService { */ private static boolean isIrrecoverable(Exception ex) { Throwable t = ExceptionsHelper.unwrapCause(ex); - if (t instanceof ElasticsearchException) { - return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status()); - } - return false; + return IRRECOVERABLE_REST_STATUSES.contains(status(t)); } /**