From cacc3f7ff85383dbf32c8d0f6a8653be6246645d Mon Sep 17 00:00:00 2001 From: Henning Andersen <33268011+henningandersen@users.noreply.github.com> Date: Mon, 1 Jul 2019 09:27:55 +0200 Subject: [PATCH] Async IO Processor release before notify (#43682) This commit changes async IO processor to release the promiseSemaphore before notifying consumers. This ensures that a bad consumer that sometimes does blocking (or otherwise slow) operations does not halt the processor. This should slightly increase the concurrency for shard fsync, but primarily improves safety so that one bad piece of code has less effect on overall system performance. --- .../util/concurrent/AsyncIOProcessor.java | 40 +++++++------- .../concurrent/AsyncIOProcessorTests.java | 53 +++++++++++++++++++ 2 files changed, 74 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java index 9dd76b1bbc9..ad5fd09edc0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java @@ -75,35 +75,33 @@ public abstract class AsyncIOProcessor { // while we are draining that mean we might exit below too early in the while loop if the drainAndSync call is fast. if (promised || promiseSemaphore.tryAcquire()) { final List>> candidates = new ArrayList<>(); - try { - if (promised) { - // we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates - // no need to preserve context for listener since it runs in current thread. - candidates.add(new Tuple<>(item, listener)); - } - // since we made the promise to process we gotta do it here at least once - drainAndProcess(candidates); - } finally { - promiseSemaphore.release(); // now to ensure we are passing it on we release the promise so another thread can take over + if (promised) { + // we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates + // no need to preserve context for listener since it runs in current thread. + candidates.add(new Tuple<>(item, listener)); } + // since we made the promise to process we gotta do it here at least once + drainAndProcessAndRelease(candidates); while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing - try { - drainAndProcess(candidates); - } finally { - promiseSemaphore.release(); - } + drainAndProcessAndRelease(candidates); } } } - private void drainAndProcess(List>> candidates) { - queue.drainTo(candidates); - processList(candidates); + private void drainAndProcessAndRelease(List>> candidates) { + Exception exception; + try { + queue.drainTo(candidates); + exception = processList(candidates); + } finally { + promiseSemaphore.release(); + } + notifyList(candidates, exception); candidates.clear(); } - private void processList(List>> candidates) { + private Exception processList(List>> candidates) { Exception exception = null; if (candidates.isEmpty() == false) { try { @@ -114,6 +112,10 @@ public abstract class AsyncIOProcessor { exception = ex; } } + return exception; + } + + private void notifyList(List>> candidates, Exception exception) { for (Tuple> tuple : candidates) { Consumer consumer = tuple.v2(); try { diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java index fc18134dcd6..c2c66d12a6e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java @@ -27,9 +27,12 @@ import org.junit.Before; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -239,4 +242,54 @@ public class AsyncIOProcessorTests extends ESTestCase { assertEquals(threadCount, received.get()); threads.forEach(t -> assertFalse(t.isAlive())); } + + public void testSlowConsumer() { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + AsyncIOProcessor processor = new AsyncIOProcessor(logger, scaledRandomIntBetween(1, 2024), threadContext) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + }; + + int threadCount = randomIntBetween(2, 10); + CyclicBarrier barrier = new CyclicBarrier(threadCount); + Semaphore serializePutSemaphore = new Semaphore(1); + List threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) { + { + setDaemon(true); + } + + @Override + public void run() { + try { + assertTrue(serializePutSemaphore.tryAcquire(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + processor.put(new Object(), (e) -> { + serializePutSemaphore.release(); + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException ex) { + throw new RuntimeException(ex); + } + notified.incrementAndGet(); + }); + } + }).collect(Collectors.toList()); + threads.forEach(Thread::start); + threads.forEach(t -> { + try { + t.join(20000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + assertEquals(threadCount, notified.get()); + assertEquals(threadCount, received.get()); + threads.forEach(t -> assertFalse(t.isAlive())); + } }