[7.x] [ML] only retry persistence failures when the failure is intermittent and stop retrying when analytics job is stopping (#53725) (#53808)
* [ML] only retry persistence failures when the failure is intermittent and stop retrying when analytics job is stopping (#53725) This fixes two issues: - Results persister would retry actions even if they are not intermittent. An example of an persistent failure is a doc mapping problem. - Data frame analytics would continue to retry to persist results even after the job is stopped. closes https://github.com/elastic/elasticsearch/issues/53687
This commit is contained in:
parent
cce60215d8
commit
433952b595
|
@ -108,6 +108,7 @@ public class AnalyticsResultProcessor {
|
|||
}
|
||||
|
||||
public void cancel() {
|
||||
dataFrameRowsJoiner.cancel();
|
||||
isCancelled = true;
|
||||
}
|
||||
|
||||
|
@ -264,12 +265,12 @@ public class AnalyticsResultProcessor {
|
|||
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
||||
WriteRequest.RefreshPolicy.IMMEDIATE,
|
||||
docIdSupplier.apply(analytics.getId()),
|
||||
() -> true,
|
||||
() -> isCancelled == false,
|
||||
errorMsg -> auditor.error(analytics.getId(),
|
||||
"failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg)
|
||||
);
|
||||
} catch (IOException ioe) {
|
||||
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe);
|
||||
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed serializing stats result", analytics.getId()), ioe);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e);
|
||||
}
|
||||
|
|
|
@ -40,9 +40,9 @@ class DataFrameRowsJoiner implements AutoCloseable {
|
|||
private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
|
||||
private LinkedList<RowResults> currentResults;
|
||||
private volatile String failure;
|
||||
private volatile boolean isCancelled;
|
||||
|
||||
DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor,
|
||||
ResultsPersisterService resultsPersisterService) {
|
||||
DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor, ResultsPersisterService resultsPersisterService) {
|
||||
this.analyticsId = Objects.requireNonNull(analyticsId);
|
||||
this.dataExtractor = Objects.requireNonNull(dataExtractor);
|
||||
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
|
||||
|
@ -70,6 +70,10 @@ class DataFrameRowsJoiner implements AutoCloseable {
|
|||
}
|
||||
}
|
||||
|
||||
void cancel() {
|
||||
isCancelled = true;
|
||||
}
|
||||
|
||||
private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
|
||||
currentResults.add(rowResults);
|
||||
if (currentResults.size() == RESULTS_BATCH_SIZE) {
|
||||
|
@ -87,7 +91,11 @@ class DataFrameRowsJoiner implements AutoCloseable {
|
|||
}
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
resultsPersisterService.bulkIndexWithHeadersWithRetry(
|
||||
dataExtractor.getHeaders(), bulkRequest, analyticsId, () -> true, errorMsg -> {});
|
||||
dataExtractor.getHeaders(),
|
||||
bulkRequest,
|
||||
analyticsId,
|
||||
() -> isCancelled == false,
|
||||
errorMsg -> {});
|
||||
}
|
||||
currentResults = new LinkedList<>();
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
|
@ -33,6 +34,8 @@ import org.elasticsearch.xpack.core.ClientHelper;
|
|||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
@ -42,6 +45,22 @@ import java.util.function.Supplier;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
public class ResultsPersisterService {
|
||||
/**
|
||||
* List of rest statuses that we consider irrecoverable
|
||||
*/
|
||||
public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = Collections.unmodifiableSet(new HashSet<>(
|
||||
Arrays.asList(
|
||||
RestStatus.GONE,
|
||||
RestStatus.NOT_IMPLEMENTED,
|
||||
RestStatus.NOT_FOUND,
|
||||
RestStatus.BAD_REQUEST,
|
||||
RestStatus.UNAUTHORIZED,
|
||||
RestStatus.FORBIDDEN,
|
||||
RestStatus.METHOD_NOT_ALLOWED,
|
||||
RestStatus.NOT_ACCEPTABLE
|
||||
)
|
||||
));
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class);
|
||||
|
||||
public static final Setting<Integer> PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting(
|
||||
|
@ -124,9 +143,23 @@ public class ResultsPersisterService {
|
|||
if (bulkResponse.hasFailures() == false) {
|
||||
return bulkResponse;
|
||||
}
|
||||
|
||||
for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
|
||||
if (itemResponse.isFailed()) {
|
||||
if (isIrrecoverable(itemResponse.getFailure().getCause())) {
|
||||
Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause());
|
||||
LOGGER.warn(new ParameterizedMessage(
|
||||
"[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]",
|
||||
jobId,
|
||||
bulkResponse.buildFailureMessage()),
|
||||
unwrappedParticular);
|
||||
throw new ElasticsearchException(
|
||||
"{} experienced failure that cannot be automatically retried. See logs for bulk failures",
|
||||
unwrappedParticular,
|
||||
jobId);
|
||||
}
|
||||
}
|
||||
}
|
||||
retryContext.nextIteration("index", bulkResponse.buildFailureMessage());
|
||||
|
||||
// We should only retry the docs that failed.
|
||||
bulkRequest = buildNewRequestFromFailures(bulkRequest, bulkResponse);
|
||||
}
|
||||
|
@ -148,12 +181,28 @@ public class ResultsPersisterService {
|
|||
} catch (ElasticsearchException e) {
|
||||
LOGGER.warn("[" + jobId + "] Exception while executing search action", e);
|
||||
failureMessage = e.getDetailedMessage();
|
||||
if (isIrrecoverable(e)) {
|
||||
LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e);
|
||||
throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId);
|
||||
}
|
||||
}
|
||||
|
||||
retryContext.nextIteration("search", failureMessage);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param ex The exception to check
|
||||
* @return true when the failure will persist no matter how many times we retry.
|
||||
*/
|
||||
private static boolean isIrrecoverable(Exception ex) {
|
||||
Throwable t = ExceptionsHelper.unwrapCause(ex);
|
||||
if (t instanceof ElasticsearchException) {
|
||||
return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link RetryContext} object handles logic that is executed between consecutive retries of an action.
|
||||
*
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.ml.utils.persistence;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
|
@ -28,8 +29,10 @@ import org.elasticsearch.cluster.service.MasterService;
|
|||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
|
@ -133,7 +136,8 @@ public class ResultsPersisterServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testSearchWithRetries_SuccessAfterRetryDueToException() {
|
||||
doThrow(new IndexNotFoundException("my-index")).doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
|
||||
doThrow(new IndexPrimaryShardNotAllocatedException(new Index("my-index", "UUID")))
|
||||
.doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
|
||||
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
|
||||
|
||||
List<String> messages = new ArrayList<>();
|
||||
|
@ -208,6 +212,21 @@ public class ResultsPersisterServiceTests extends ESTestCase {
|
|||
verify(client, times(maxRetries + 1)).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
|
||||
}
|
||||
|
||||
public void testSearchWithRetries_FailureOnIrrecoverableError() {
|
||||
resultsPersisterService.setMaxFailureRetries(5);
|
||||
|
||||
doAnswer(withFailure(new ElasticsearchStatusException("bad search request", RestStatus.BAD_REQUEST)))
|
||||
.when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
|
||||
|
||||
ElasticsearchException e =
|
||||
expectThrows(
|
||||
ElasticsearchException.class,
|
||||
() -> resultsPersisterService.searchWithRetry(SEARCH_REQUEST, JOB_ID, () -> true, (s) -> {}));
|
||||
assertThat(e.getMessage(), containsString("experienced failure that cannot be automatically retried"));
|
||||
|
||||
verify(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
|
||||
}
|
||||
|
||||
private static Supplier<Boolean> shouldRetryUntil(int maxRetries) {
|
||||
return new Supplier<Boolean>() {
|
||||
int retries = 0;
|
||||
|
@ -242,6 +261,29 @@ public class ResultsPersisterServiceTests extends ESTestCase {
|
|||
assertThat(lastMessage.get(), containsString("failed to index after [1] attempts. Will attempt again in"));
|
||||
}
|
||||
|
||||
public void testBulkRequestChangeOnIrrecoverableFailures() {
|
||||
int maxFailureRetries = 10;
|
||||
resultsPersisterService.setMaxFailureRetries(maxFailureRetries);
|
||||
BulkItemResponse irrecoverable = new BulkItemResponse(
|
||||
2,
|
||||
DocWriteRequest.OpType.INDEX,
|
||||
new BulkItemResponse.Failure("my-index", "_doc", "fail", new ElasticsearchStatusException("boom", RestStatus.BAD_REQUEST)));
|
||||
doAnswerWithResponses(
|
||||
new BulkResponse(new BulkItemResponse[]{irrecoverable, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
|
||||
new BulkResponse(new BulkItemResponse[0], 0L))
|
||||
.when(client).execute(eq(BulkAction.INSTANCE), any(), any());
|
||||
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.add(INDEX_REQUEST_FAILURE);
|
||||
bulkRequest.add(INDEX_REQUEST_SUCCESS);
|
||||
|
||||
ElasticsearchException ex = expectThrows(ElasticsearchException.class,
|
||||
() -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, (s)->{}));
|
||||
|
||||
verify(client).execute(eq(BulkAction.INSTANCE), any(), any());
|
||||
assertThat(ex.getMessage(), containsString("experienced failure that cannot be automatically retried."));
|
||||
}
|
||||
|
||||
public void testBulkRequestDoesNotRetryWhenSupplierIsFalse() {
|
||||
doAnswerWithResponses(
|
||||
new BulkResponse(new BulkItemResponse[]{BULK_ITEM_RESPONSE_FAILURE, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
|
||||
|
@ -317,6 +359,15 @@ public class ResultsPersisterServiceTests extends ESTestCase {
|
|||
};
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <Response> Answer<Response> withFailure(Exception failure) {
|
||||
return invocationOnMock -> {
|
||||
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
|
||||
listener.onFailure(failure);
|
||||
return null;
|
||||
};
|
||||
}
|
||||
|
||||
private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) {
|
||||
CheckedConsumer<Integer, InterruptedException> sleeper = millis -> {};
|
||||
ThreadPool tp = mock(ThreadPool.class);
|
||||
|
|
Loading…
Reference in New Issue