diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 24c30f11609..69b57aedb02 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -191,13 +191,33 @@ public class BulkProcessor implements Closeable { } } - @Override /** - * Closes the processor. If flushing by time is enabled, then its shutdown. Any remaining bulk actions are flushed. + * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. */ - public synchronized void close() { + @Override + public void close() { + try { + awaitClose(0, TimeUnit.NANOSECONDS); + } catch(InterruptedException exc) { + Thread.currentThread().interrupt(); + } + } + + /** + * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. + * + * If concurrent requests are not enabled, returns {@code true} immediately. + * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true}, + * If the specified waiting time elapses before all bulk requests complete, {@code false} is returned. + * + * @param timeout The maximum time to wait for the bulk requests to complete + * @param unit The time unit of the {@code timeout} argument + * @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests completed + * @throws InterruptedException If the current thread is interrupted + */ + public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { if (closed) { - return; + return true; } closed = true; if (this.scheduledFuture != null) { @@ -207,6 +227,10 @@ public class BulkProcessor implements Closeable { if (bulkRequest.numberOfActions() > 0) { execute(); } + if (this.concurrentRequests < 1) { + return true; + } + return semaphore.tryAcquire(this.concurrentRequests, timeout, unit); } /**