diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java index 9dd76b1bbc9..ad5fd09edc0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java @@ -75,35 +75,33 @@ public abstract class AsyncIOProcessor { // while we are draining that mean we might exit below too early in the while loop if the drainAndSync call is fast. if (promised || promiseSemaphore.tryAcquire()) { final List>> candidates = new ArrayList<>(); - try { - if (promised) { - // we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates - // no need to preserve context for listener since it runs in current thread. - candidates.add(new Tuple<>(item, listener)); - } - // since we made the promise to process we gotta do it here at least once - drainAndProcess(candidates); - } finally { - promiseSemaphore.release(); // now to ensure we are passing it on we release the promise so another thread can take over + if (promised) { + // we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates + // no need to preserve context for listener since it runs in current thread. + candidates.add(new Tuple<>(item, listener)); } + // since we made the promise to process we gotta do it here at least once + drainAndProcessAndRelease(candidates); while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing - try { - drainAndProcess(candidates); - } finally { - promiseSemaphore.release(); - } + drainAndProcessAndRelease(candidates); } } } - private void drainAndProcess(List>> candidates) { - queue.drainTo(candidates); - processList(candidates); + private void drainAndProcessAndRelease(List>> candidates) { + Exception exception; + try { + queue.drainTo(candidates); + exception = processList(candidates); + } finally { + promiseSemaphore.release(); + } + notifyList(candidates, exception); candidates.clear(); } - private void processList(List>> candidates) { + private Exception processList(List>> candidates) { Exception exception = null; if (candidates.isEmpty() == false) { try { @@ -114,6 +112,10 @@ public abstract class AsyncIOProcessor { exception = ex; } } + return exception; + } + + private void notifyList(List>> candidates, Exception exception) { for (Tuple> tuple : candidates) { Consumer consumer = tuple.v2(); try { diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java index fc18134dcd6..c2c66d12a6e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java @@ -27,9 +27,12 @@ import org.junit.Before; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -239,4 +242,54 @@ public class AsyncIOProcessorTests extends ESTestCase { assertEquals(threadCount, received.get()); threads.forEach(t -> assertFalse(t.isAlive())); } + + public void testSlowConsumer() { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024), threadContext) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + }; + + int threadCount = randomIntBetween(2, 10); + CyclicBarrier barrier = new CyclicBarrier(threadCount); + Semaphore serializePutSemaphore = new Semaphore(1); + List threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) { + { + setDaemon(true); + } + + @Override + public void run() { + try { + assertTrue(serializePutSemaphore.tryAcquire(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + processor.put(new Object(), (e) -> { + serializePutSemaphore.release(); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException ex) { + throw new RuntimeException(ex); + } + notified.incrementAndGet(); + }); + } + }).collect(Collectors.toList()); + threads.forEach(Thread::start); + threads.forEach(t -> { + try { + t.join(20000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + assertEquals(threadCount, notified.get()); + assertEquals(threadCount, received.get()); + threads.forEach(t -> assertFalse(t.isAlive())); + } }