From 60626721482b7d44b1e0c3894789f907cd16bb42 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 11 Aug 2020 16:17:37 +0300 Subject: [PATCH] [7.x][ML] Monitor reindex response in DF analytics (#60911) (#60958) Examines the reindex response in order to report potential problems that occurred during the reindexing phase of data frame analytics jobs. Backport of #60911 --- .../dataframe/DataFrameAnalyticsManager.java | 34 +++++++++++++++++++ .../extractor/DataFrameDataExtractor.java | 1 + 2 files changed, 35 insertions(+) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 46babd2f1f4..8095c98bcfa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient; @@ -214,6 +215,7 @@ public class DataFrameAnalyticsManager { // Reindexing is complete; start analytics ActionListener reindexCompletedListener = ActionListener.wrap( reindexResponse -> { + // If the reindex task is canceled, this listener is called. // Consequently, we should not signal reindex completion. if (task.isStopping()) { @@ -222,7 +224,18 @@ public class DataFrameAnalyticsManager { task.markAsCompleted(); return; } + task.setReindexingTaskId(null); + + Exception reindexError = getReindexError(task.getParams().getId(), reindexResponse); + if (reindexError != null) { + task.markAsFailed(reindexError); + return; + } + + LOGGER.debug("[{}] Reindex completed; created [{}]; retries [{}]", task.getParams().getId(), + reindexResponse.getCreated(), reindexResponse.getBulkRetries()); + auditor.info( config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(), @@ -251,6 +264,7 @@ public class DataFrameAnalyticsManager { reindexRequest.setDestIndex(config.getDest().getIndex()); reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id")); reindexRequest.setParentTask(task.getParentTaskId()); + reindexRequest.getSearchRequest().allowPartialSearchResults(false); final ThreadContext threadContext = parentTaskClient.threadPool().getThreadContext(); final Supplier supplier = threadContext.newRestorableContext(false); @@ -295,6 +309,26 @@ public class DataFrameAnalyticsManager { new GetIndexRequest().indices(config.getDest().getIndex()), destIndexListener); } + private static Exception getReindexError(String jobId, BulkByScrollResponse reindexResponse) { + if (reindexResponse.getBulkFailures().isEmpty() == false) { + LOGGER.error("[{}] reindexing encountered {} failures", jobId, + reindexResponse.getBulkFailures().size()); + for (BulkItemResponse.Failure failure : reindexResponse.getBulkFailures()) { + LOGGER.error("[{}] reindexing failure: {}", jobId, failure); + } + return ExceptionsHelper.serverError("reindexing encountered " + reindexResponse.getBulkFailures().size() + " failures"); + } + if (reindexResponse.getReasonCancelled() != null) { + LOGGER.error("[{}] reindex task got cancelled with reason [{}]", jobId, reindexResponse.getReasonCancelled()); + return ExceptionsHelper.serverError("reindex task got cancelled with reason [" + reindexResponse.getReasonCancelled() + "]"); + } + if (reindexResponse.isTimedOut()) { + LOGGER.error("[{}] reindex task timed out after [{}]", jobId, reindexResponse.getTook().getStringRep()); + return ExceptionsHelper.serverError("reindex task timed out after [" + reindexResponse.getTook().getStringRep() + "]"); + } + return null; + } + private static boolean isTaskCancelledException(Exception error) { return ExceptionsHelper.unwrapCause(error) instanceof TaskCancelledException || ExceptionsHelper.unwrapCause(error.getCause()) instanceof TaskCancelledException; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java index a431cdbb965..a82fc92a675 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java @@ -282,6 +282,7 @@ public class DataFrameDataExtractor { } return new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setAllowPartialSearchResults(false) .setIndices(context.indices) .setSize(0) .setQuery(summaryQuery)