Align handling of interrupts in BulkProcessor

With this commit we implement a cancellation policy in
BulkProcessor which is aligned for the sync and the async case
and also document it.

Closes #14833.
This commit is contained in:
Daniel Mitterdorfer 2015-12-18 08:19:40 +01:00
parent 7e53076112
commit 56e4752d28
2 changed files with 14 additions and 5 deletions

View File

@ -61,6 +61,9 @@ public class BulkProcessor implements Closeable {
/** /**
* Callback after a failed execution of bulk request. * Callback after a failed execution of bulk request.
*
* Note that in case an instance of <code>InterruptedException</code> is passed, which means that request processing has been
* cancelled externally, the thread's interruption status has been restored prior to calling this method.
*/ */
void afterBulk(long executionId, BulkRequest request, Throwable failure); void afterBulk(long executionId, BulkRequest request, Throwable failure);
} }

View File

@ -74,11 +74,17 @@ abstract class BulkRequestHandler {
.withSyncBackoff(client, bulkRequest); .withSyncBackoff(client, bulkRequest);
afterCalled = true; afterCalled = true;
listener.afterBulk(executionId, bulkRequest, bulkResponse); listener.afterBulk(executionId, bulkRequest, bulkResponse);
} catch (Exception e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("Bulk request {} has been cancelled.", e, executionId);
if (!afterCalled) { if (!afterCalled) {
logger.warn("Failed to executed bulk request {}.", e, executionId);
listener.afterBulk(executionId, bulkRequest, e); listener.afterBulk(executionId, bulkRequest, e);
} }
} catch (Throwable t) {
logger.warn("Failed to execute bulk request {}.", t, executionId);
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, t);
}
} }
} }
@ -135,11 +141,11 @@ abstract class BulkRequestHandler {
}); });
bulkRequestSetupSuccessful = true; bulkRequestSetupSuccessful = true;
} catch (InterruptedException e) { } catch (InterruptedException e) {
// This is intentionally wrong to avoid changing the behaviour implicitly with this PR. It will be fixed in #14833 Thread.currentThread().interrupt();
Thread.interrupted(); logger.info("Bulk request {} has been cancelled.", e, executionId);
listener.afterBulk(executionId, bulkRequest, e); listener.afterBulk(executionId, bulkRequest, e);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Failed to executed bulk request {}.", t, executionId); logger.warn("Failed to execute bulk request {}.", t, executionId);
listener.afterBulk(executionId, bulkRequest, t); listener.afterBulk(executionId, bulkRequest, t);
} finally { } finally {
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore