From a6da1fd73e23e83e5040ddde5ee609bcfd53214e Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 29 Jul 2020 10:52:36 -0400 Subject: [PATCH] [ML] require alias when indexing to an alias that should be created (#60315) (#60394) This sets up all indexing to one of our write aliases to require it actually be an alias. This allows failures scenarios to be captured quickly, loudly, and then potentially recovered. --- .../xpack/ml/integration/DeleteJobIT.java | 1 + .../integration/ModelSnapshotRetentionIT.java | 15 ++++--- .../ml/integration/RevertModelSnapshotIT.java | 1 + .../xpack/ml/MlConfigMigrator.java | 1 + .../ml/annotations/AnnotationPersister.java | 4 +- .../ml/dataframe/DataFrameAnalyticsTask.java | 1 + .../ml/dataframe/stats/StatsPersister.java | 1 + .../inference/TrainedModelStatsService.java | 2 +- .../persistence/JobDataCountsPersister.java | 2 + .../job/persistence/JobResultsPersister.java | 25 ++++++----- .../ml/process/IndexingStateProcessor.java | 4 +- .../persistence/ResultsPersisterService.java | 43 +++++++++++-------- 12 files changed, 60 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java index a2d09ecaa31..a395845ccbd 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -110,6 +110,7 @@ public class DeleteJobIT extends MlNativeAutodetectIntegTestCase { try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME) .source(xContentBuilder) + .setRequireAlias(true) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java index bc3afbc3381..d7f276450b4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ModelSnapshotRetentionIT.java @@ -203,8 +203,9 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { ModelSnapshot.Builder modelSnapshotBuilder = new ModelSnapshot.Builder(); modelSnapshotBuilder.setJobId(jobId).setSnapshotId(snapshotId).setTimestamp(timestamp).setSnapshotDocCount(numDocs); - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)); - indexRequest.id(ModelSnapshot.documentId(jobId, snapshotId)); + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)) + .id(ModelSnapshot.documentId(jobId, snapshotId)) + .setRequireAlias(true); if (immediateRefresh) { indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } @@ -219,12 +220,12 @@ public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase { private void persistModelStateDocs(String jobId, String snapshotId, int numDocs) { assertThat(numDocs, greaterThan(0)); - BulkRequest bulkRequest = new BulkRequest(); + BulkRequest bulkRequest = new BulkRequest().requireAlias(true); for (int i = 1; i <= numDocs; ++i) { - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()); - indexRequest.id(ModelState.documentId(jobId, snapshotId, i)); - // The exact contents of the model state doesn't matter - we are not going to try and restore it - indexRequest.source(Collections.singletonMap("compressed", Collections.singletonList("foo"))); + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .id(ModelState.documentId(jobId, snapshotId, i)) + // The exact contents of the model state doesn't matter - we are not going to try and restore it + .source(Collections.singletonMap("compressed", Collections.singletonList("foo"))); bulkRequest.add(indexRequest); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java index 8c769e1cacb..ce84789db53 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java @@ -216,6 +216,7 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase { try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) { return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME) .source(xContentBuilder) + .setRequireAlias(true) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index eba19a239d4..5ee08b0ce4c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -451,6 +451,7 @@ public class MlConfigMigrator { String documentId = "ml-config"; IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) .id(documentId) + .setRequireAlias(true) .opType(DocWriteRequest.OpType.CREATE); ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java index 3744e3e1a5b..ff9add3cd1f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java @@ -74,7 +74,7 @@ public class AnnotationPersister { public class Builder { private final String jobId; - private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); + private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true); private Supplier shouldRetry = () -> true; private Builder(String jobId) { @@ -115,7 +115,7 @@ public class AnnotationPersister { BulkResponse bulkResponse = resultsPersisterService.bulkIndexWithRetry( bulkRequest, jobId, shouldRetry, msg -> auditor.warning(jobId, "Bulk indexing of annotations failed " + msg)); - bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME); + bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME).requireAlias(true); return bulkResponse; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index 5e2f54c1667..d046fbc137d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -299,6 +299,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S } IndexRequest indexRequest = new IndexRequest(indexOrAlias) .id(progressDocId) + .setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) { new StoredProgress(stats.get().getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java index 4d8179e9fcc..0599482cfd9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsPersister.java @@ -44,6 +44,7 @@ public class StatsPersister { new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")), WriteRequest.RefreshPolicy.NONE, docIdSupplier.apply(jobId), + true, () -> true, errorMsg -> auditor.error(jobId, "failed to persist result with id [" + docIdSupplier.apply(jobId) + "]; " + errorMsg) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java index 8fb57b5a842..5a434d0cc3c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java @@ -174,7 +174,7 @@ public class TrainedModelStatsService { if (stats.isEmpty()) { return; } - BulkRequest bulkRequest = new BulkRequest(); + BulkRequest bulkRequest = new BulkRequest().requireAlias(true); stats.stream().map(TrainedModelStatsService::buildUpdateRequest).filter(Objects::nonNull).forEach(bulkRequest::add); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); if (bulkRequest.requests().isEmpty()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java index 6769e4d790d..ee8ba3b1c10 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java @@ -65,6 +65,7 @@ public class JobDataCountsPersister { ToXContent.EMPTY_PARAMS, WriteRequest.RefreshPolicy.NONE, DataCounts.documentId(jobId), + true, () -> true, (msg) -> auditor.warning(jobId, "Job data_counts " + msg)); } catch (IOException ioe) { @@ -88,6 +89,7 @@ public class JobDataCountsPersister { try (XContentBuilder content = serialiseCounts(counts)) { final IndexRequest request = new IndexRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId)) .id(DataCounts.documentId(jobId)) + .setRequireAlias(true) .source(content); executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, request, new ActionListener() { @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 43a16b400a8..d50308da865 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -266,7 +266,7 @@ public class JobResultsPersister { public void persistCategoryDefinition(CategoryDefinition category, Supplier shouldRetry) { Persistable persistable = new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId()), category.getJobId(), category, category.getId()); - persistable.persist(shouldRetry); + persistable.persist(shouldRetry, true); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -290,7 +290,7 @@ public class JobResultsPersister { : AnomalyDetectorsIndex.jobStateIndexWriteAlias(); Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId); - persistable.persist(shouldRetry); + persistable.persist(shouldRetry, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); } /** @@ -311,7 +311,7 @@ public class JobResultsPersister { Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(listener); + persistable.persist(listener, AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); }, listener::onFailure ); @@ -345,7 +345,7 @@ public class JobResultsPersister { modelSnapshot, ModelSnapshot.documentId(modelSnapshot)); persistable.setRefreshPolicy(refreshPolicy); - return persistable.persist(shouldRetry); + return persistable.persist(shouldRetry, true); } /** @@ -356,7 +356,7 @@ public class JobResultsPersister { logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(jobId), jobId, modelSizeStats, modelSizeStats.getId()); - persistable.persist(shouldRetry); + persistable.persist(shouldRetry, true); } /** @@ -369,7 +369,7 @@ public class JobResultsPersister { Persistable persistable = new Persistable(AnomalyDetectorsIndex.resultsWriteAlias(jobId), jobId, modelSizeStats, modelSizeStats.getId()); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(listener); + persistable.persist(listener, true); } /** @@ -448,7 +448,7 @@ public class JobResultsPersister { new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")), DatafeedTimingStats.documentId(timingStats.getJobId())); persistable.setRefreshPolicy(refreshPolicy); - return persistable.persist(() -> true); + return persistable.persist(() -> true, true); } private static XContentBuilder toXContentBuilder(ToXContent obj, ToXContent.Params params) throws IOException { @@ -483,7 +483,7 @@ public class JobResultsPersister { this.refreshPolicy = refreshPolicy; } - BulkResponse persist(Supplier shouldRetry) { + BulkResponse persist(Supplier shouldRetry, boolean requireAlias) { logCall(indexName); try { return resultsPersisterService.indexWithRetry(jobId, @@ -492,6 +492,7 @@ public class JobResultsPersister { params, refreshPolicy, id, + requireAlias, shouldRetry, (msg) -> auditor.warning(jobId, id + " " + msg)); } catch (IOException e) { @@ -504,11 +505,15 @@ public class JobResultsPersister { } } - void persist(ActionListener listener) { + void persist(ActionListener listener, boolean requireAlias) { logCall(indexName); try (XContentBuilder content = toXContentBuilder(object, params)) { - IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(content).setRefreshPolicy(refreshPolicy); + IndexRequest indexRequest = new IndexRequest(indexName) + .id(id) + .source(content) + .setRefreshPolicy(refreshPolicy) + .setRequireAlias(requireAlias); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest, listener, client::index); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Error writing [{}]", jobId, (id == null) ? "auto-generated ID" : id), e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java index 2b45aa9bf91..7f4497addc7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java @@ -137,7 +137,9 @@ public class IndexingStateProcessor implements StateProcessor { } void persist(String indexOrAlias, BytesReference bytes) throws IOException { - BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + BulkRequest bulkRequest = new BulkRequest() + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .requireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); bulkRequest.add(bytes, indexOrAlias, XContentType.JSON); if (bulkRequest.numberOfActions() > 0) { LOGGER.trace("[{}] Persisting job state document: index [{}], length [{}]", jobId, indexOrAlias, bytes.length()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java index 3adab97b7f5..3c749acb793 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -44,6 +45,8 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.ExceptionsHelper.status; + public class ResultsPersisterService { /** * List of rest statuses that we consider irrecoverable @@ -52,6 +55,7 @@ public class ResultsPersisterService { Arrays.asList( RestStatus.GONE, RestStatus.NOT_IMPLEMENTED, + // Not found is returned when we require an alias but the index is NOT an alias. RestStatus.NOT_FOUND, RestStatus.BAD_REQUEST, RestStatus.UNAUTHORIZED, @@ -105,9 +109,10 @@ public class ResultsPersisterService { ToXContent.Params params, WriteRequest.RefreshPolicy refreshPolicy, String id, + boolean requireAlias, Supplier shouldRetry, Consumer msgHandler) throws IOException { - BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy); + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(refreshPolicy).requireAlias(requireAlias); try (XContentBuilder content = object.toXContent(XContentFactory.jsonBuilder(), params)) { bulkRequest.add(new IndexRequest(indexName).id(id).source(content)); } @@ -144,19 +149,18 @@ public class ResultsPersisterService { return bulkResponse; } for (BulkItemResponse itemResponse : bulkResponse.getItems()) { - if (itemResponse.isFailed()) { - if (isIrrecoverable(itemResponse.getFailure().getCause())) { - Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause()); - LOGGER.warn(new ParameterizedMessage( - "[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]", - jobId, - bulkResponse.buildFailureMessage()), - unwrappedParticular); - throw new ElasticsearchException( - "{} experienced failure that cannot be automatically retried. See logs for bulk failures", - unwrappedParticular, - jobId); - } + if (itemResponse.isFailed() && isIrrecoverable(itemResponse.getFailure().getCause())) { + Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause()); + LOGGER.warn(new ParameterizedMessage( + "[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]", + jobId, + bulkResponse.buildFailureMessage()), + unwrappedParticular); + throw new ElasticsearchStatusException( + "{} experienced failure that cannot be automatically retried. See logs for bulk failures", + status(unwrappedParticular), + unwrappedParticular, + jobId); } } retryContext.nextIteration("index", bulkResponse.buildFailureMessage()); @@ -183,7 +187,11 @@ public class ResultsPersisterService { failureMessage = e.getDetailedMessage(); if (isIrrecoverable(e)) { LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e); - throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId); + throw new ElasticsearchStatusException( + "{} experienced failure that cannot be automatically retried", + status(e), + e, + jobId); } } @@ -197,10 +205,7 @@ public class ResultsPersisterService { */ private static boolean isIrrecoverable(Exception ex) { Throwable t = ExceptionsHelper.unwrapCause(ex); - if (t instanceof ElasticsearchException) { - return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status()); - } - return false; + return IRRECOVERABLE_REST_STATUSES.contains(status(t)); } /**