diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index af5af80ac2f..43014cfb759 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -61,6 +61,9 @@ public class BulkProcessor implements Closeable { /** * Callback after a failed execution of bulk request. + * + * Note that in case an instance of InterruptedException is passed, which means that request processing has been + * cancelled externally, the thread's interruption status has been restored prior to calling this method. */ void afterBulk(long executionId, BulkRequest request, Throwable failure); } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java index ffc985bd510..dc98a16c578 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java @@ -74,11 +74,17 @@ abstract class BulkRequestHandler { .withSyncBackoff(client, bulkRequest); afterCalled = true; listener.afterBulk(executionId, bulkRequest, bulkResponse); - } catch (Exception e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.info("Bulk request {} has been cancelled.", e, executionId); if (!afterCalled) { - logger.warn("Failed to executed bulk request {}.", e, executionId); listener.afterBulk(executionId, bulkRequest, e); } + } catch (Throwable t) { + logger.warn("Failed to execute bulk request {}.", t, executionId); + if (!afterCalled) { + listener.afterBulk(executionId, bulkRequest, t); + } } } @@ -135,11 +141,11 @@ abstract class BulkRequestHandler { }); bulkRequestSetupSuccessful = true; } catch (InterruptedException e) { - // This is intentionally wrong to avoid changing the behaviour implicitly with this PR. It will be fixed in #14833 - Thread.interrupted(); + Thread.currentThread().interrupt(); + logger.info("Bulk request {} has been cancelled.", e, executionId); listener.afterBulk(executionId, bulkRequest, e); } catch (Throwable t) { - logger.warn("Failed to executed bulk request {}.", t, executionId); + logger.warn("Failed to execute bulk request {}.", t, executionId); listener.afterBulk(executionId, bulkRequest, t); } finally { if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore