Reduce log level for pipeline failure (#54097)

Today we log `failed to execute pipeline for a bulk request` at `ERROR` level
if an attempt to run an ingest pipeline fails. A failure here is commonly due
to an `EsRejectedExecutionException`. We also feed such failures back to the
client and record the rejection in the threadpool statistics.

In line with #51459 there is no need to log failures within actions so noisily
and with such urgency. It is better to leave it up to the client to react
accordingly. Typically an `EsRejectedExecutionException` should result in the
client backing off and retrying, so a failure here is not normally fatal enough
to justify an `ERROR` log at all.

This commit reduces the log level for this message to `DEBUG`.
This commit is contained in:
David Turner 2020-03-24 16:57:17 +00:00
parent 6a60f85bba
commit 21afc788f8
1 changed files with 6 additions and 4 deletions

View File

@ -96,6 +96,8 @@ import static java.util.Collections.emptyMap;
*/
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
private static final Logger logger = LogManager.getLogger(TransportBulkAction.class);
private final ThreadPool threadPool;
private final AutoCreateIndex autoCreateIndex;
private final ClusterService clusterService;
@ -638,7 +640,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return relativeTimeProvider.getAsLong();
}
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
private void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(
@ -647,7 +649,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
logger.debug("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
@ -696,7 +698,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class);
private static final Logger logger = LogManager.getLogger(BulkRequestModifier.class);
final BulkRequest bulkRequest;
final SparseFixedBitSet failedSlots;
@ -778,7 +780,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
synchronized void markItemAsFailed(int slot, Exception e) {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e);
// We hit a error during preprocessing a request, so we: