Async IO Processor release before notify (#43682)

This commit changes async IO processor to release the promiseSemaphore
before notifying consumers. This ensures that a bad consumer that
sometimes does blocking (or otherwise slow) operations does not halt the
processor. This should slightly increase the concurrency for shard
fsync, but primarily improves safety so that one bad piece of code has
less effect on overall system performance.
This commit is contained in:
Henning Andersen 2019-07-01 09:27:55 +02:00 committed by Henning Andersen
parent c593085104
commit cacc3f7ff8
2 changed files with 74 additions and 19 deletions

View File

@ -75,35 +75,33 @@ public abstract class AsyncIOProcessor<Item> {
// while we are draining that mean we might exit below too early in the while loop if the drainAndSync call is fast.
if (promised || promiseSemaphore.tryAcquire()) {
final List<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<>();
try {
if (promised) {
// we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates
// no need to preserve context for listener since it runs in current thread.
candidates.add(new Tuple<>(item, listener));
}
// since we made the promise to process we gotta do it here at least once
drainAndProcess(candidates);
} finally {
promiseSemaphore.release(); // now to ensure we are passing it on we release the promise so another thread can take over
if (promised) {
// we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates
// no need to preserve context for listener since it runs in current thread.
candidates.add(new Tuple<>(item, listener));
}
// since we made the promise to process we gotta do it here at least once
drainAndProcessAndRelease(candidates);
while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) {
// yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing
try {
drainAndProcess(candidates);
} finally {
promiseSemaphore.release();
}
drainAndProcessAndRelease(candidates);
}
}
}
private void drainAndProcess(List<Tuple<Item, Consumer<Exception>>> candidates) {
queue.drainTo(candidates);
processList(candidates);
private void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
Exception exception;
try {
queue.drainTo(candidates);
exception = processList(candidates);
} finally {
promiseSemaphore.release();
}
notifyList(candidates, exception);
candidates.clear();
}
private void processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
private Exception processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
Exception exception = null;
if (candidates.isEmpty() == false) {
try {
@ -114,6 +112,10 @@ public abstract class AsyncIOProcessor<Item> {
exception = ex;
}
}
return exception;
}
private void notifyList(List<Tuple<Item, Consumer<Exception>>> candidates, Exception exception) {
for (Tuple<Item, Consumer<Exception>> tuple : candidates) {
Consumer<Exception> consumer = tuple.v2();
try {

View File

@ -27,9 +27,12 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -239,4 +242,54 @@ public class AsyncIOProcessorTests extends ESTestCase {
assertEquals(threadCount, received.get());
threads.forEach(t -> assertFalse(t.isAlive()));
}
public void testSlowConsumer() {
AtomicInteger received = new AtomicInteger(0);
AtomicInteger notified = new AtomicInteger(0);
AsyncIOProcessor<Object> processor = new AsyncIOProcessor<Object>(logger, scaledRandomIntBetween(1, 2024), threadContext) {
@Override
protected void write(List<Tuple<Object, Consumer<Exception>>> candidates) throws IOException {
received.addAndGet(candidates.size());
}
};
int threadCount = randomIntBetween(2, 10);
CyclicBarrier barrier = new CyclicBarrier(threadCount);
Semaphore serializePutSemaphore = new Semaphore(1);
List<Thread> threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) {
{
setDaemon(true);
}
@Override
public void run() {
try {
assertTrue(serializePutSemaphore.tryAcquire(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
processor.put(new Object(), (e) -> {
serializePutSemaphore.release();
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException ex) {
throw new RuntimeException(ex);
}
notified.incrementAndGet();
});
}
}).collect(Collectors.toList());
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join(20000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assertEquals(threadCount, notified.get());
assertEquals(threadCount, received.get());
threads.forEach(t -> assertFalse(t.isAlive()));
}
}