Examines the reindex response in order to report potential problems that occurred during the reindexing phase of data frame analytics jobs. Backport of #60911
This commit is contained in:
parent
ab8518fb5b
commit
6062672148
|
@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||||
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
|
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
|
||||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||||
|
@ -214,6 +215,7 @@ public class DataFrameAnalyticsManager {
|
||||||
// Reindexing is complete; start analytics
|
// Reindexing is complete; start analytics
|
||||||
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
|
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
|
||||||
reindexResponse -> {
|
reindexResponse -> {
|
||||||
|
|
||||||
// If the reindex task is canceled, this listener is called.
|
// If the reindex task is canceled, this listener is called.
|
||||||
// Consequently, we should not signal reindex completion.
|
// Consequently, we should not signal reindex completion.
|
||||||
if (task.isStopping()) {
|
if (task.isStopping()) {
|
||||||
|
@ -222,7 +224,18 @@ public class DataFrameAnalyticsManager {
|
||||||
task.markAsCompleted();
|
task.markAsCompleted();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
task.setReindexingTaskId(null);
|
task.setReindexingTaskId(null);
|
||||||
|
|
||||||
|
Exception reindexError = getReindexError(task.getParams().getId(), reindexResponse);
|
||||||
|
if (reindexError != null) {
|
||||||
|
task.markAsFailed(reindexError);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug("[{}] Reindex completed; created [{}]; retries [{}]", task.getParams().getId(),
|
||||||
|
reindexResponse.getCreated(), reindexResponse.getBulkRetries());
|
||||||
|
|
||||||
auditor.info(
|
auditor.info(
|
||||||
config.getId(),
|
config.getId(),
|
||||||
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
|
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
|
||||||
|
@ -251,6 +264,7 @@ public class DataFrameAnalyticsManager {
|
||||||
reindexRequest.setDestIndex(config.getDest().getIndex());
|
reindexRequest.setDestIndex(config.getDest().getIndex());
|
||||||
reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id"));
|
reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id"));
|
||||||
reindexRequest.setParentTask(task.getParentTaskId());
|
reindexRequest.setParentTask(task.getParentTaskId());
|
||||||
|
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
|
||||||
|
|
||||||
final ThreadContext threadContext = parentTaskClient.threadPool().getThreadContext();
|
final ThreadContext threadContext = parentTaskClient.threadPool().getThreadContext();
|
||||||
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
|
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
|
||||||
|
@ -295,6 +309,26 @@ public class DataFrameAnalyticsManager {
|
||||||
new GetIndexRequest().indices(config.getDest().getIndex()), destIndexListener);
|
new GetIndexRequest().indices(config.getDest().getIndex()), destIndexListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Exception getReindexError(String jobId, BulkByScrollResponse reindexResponse) {
|
||||||
|
if (reindexResponse.getBulkFailures().isEmpty() == false) {
|
||||||
|
LOGGER.error("[{}] reindexing encountered {} failures", jobId,
|
||||||
|
reindexResponse.getBulkFailures().size());
|
||||||
|
for (BulkItemResponse.Failure failure : reindexResponse.getBulkFailures()) {
|
||||||
|
LOGGER.error("[{}] reindexing failure: {}", jobId, failure);
|
||||||
|
}
|
||||||
|
return ExceptionsHelper.serverError("reindexing encountered " + reindexResponse.getBulkFailures().size() + " failures");
|
||||||
|
}
|
||||||
|
if (reindexResponse.getReasonCancelled() != null) {
|
||||||
|
LOGGER.error("[{}] reindex task got cancelled with reason [{}]", jobId, reindexResponse.getReasonCancelled());
|
||||||
|
return ExceptionsHelper.serverError("reindex task got cancelled with reason [" + reindexResponse.getReasonCancelled() + "]");
|
||||||
|
}
|
||||||
|
if (reindexResponse.isTimedOut()) {
|
||||||
|
LOGGER.error("[{}] reindex task timed out after [{}]", jobId, reindexResponse.getTook().getStringRep());
|
||||||
|
return ExceptionsHelper.serverError("reindex task timed out after [" + reindexResponse.getTook().getStringRep() + "]");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isTaskCancelledException(Exception error) {
|
private static boolean isTaskCancelledException(Exception error) {
|
||||||
return ExceptionsHelper.unwrapCause(error) instanceof TaskCancelledException
|
return ExceptionsHelper.unwrapCause(error) instanceof TaskCancelledException
|
||||||
|| ExceptionsHelper.unwrapCause(error.getCause()) instanceof TaskCancelledException;
|
|| ExceptionsHelper.unwrapCause(error.getCause()) instanceof TaskCancelledException;
|
||||||
|
|
|
@ -282,6 +282,7 @@ public class DataFrameDataExtractor {
|
||||||
}
|
}
|
||||||
|
|
||||||
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
|
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
|
||||||
|
.setAllowPartialSearchResults(false)
|
||||||
.setIndices(context.indices)
|
.setIndices(context.indices)
|
||||||
.setSize(0)
|
.setSize(0)
|
||||||
.setQuery(summaryQuery)
|
.setQuery(summaryQuery)
|
||||||
|
|
Loading…
Reference in New Issue