From 5b625cea96450f284ad273b5619fb1e2eeaea42d Mon Sep 17 00:00:00 2001 From: Rohan Garg <7731512+rohangarg@users.noreply.github.com> Date: Fri, 18 Nov 2022 18:26:08 +0530 Subject: [PATCH] Improve performance for ReadableInputStreamFrameChannel (#13373) * Improve performance for ReadableInputStreamFrameChannel * Fix race condition leading to unnecessary sleep --- .../ReadableInputStreamFrameChannel.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java index d804e9d1cbb..f06302b4920 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java @@ -58,6 +58,8 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel private volatile boolean keepReading = true; + private final Object readMonitor = new Object(); + private final ExecutorService executorService; /** @@ -152,7 +154,11 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel while (true) { if (!keepReading) { try { - Thread.sleep(nextRetrySleepMillis(nTry)); + synchronized (readMonitor) { + if (!keepReading) { + readMonitor.wait(nextRetrySleepMillis(nTry)); + } + } synchronized (lock) { if (inputStreamFinished || inputStreamError || delegate.isErrorOrFinished()) { return; @@ -186,7 +192,15 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel totalInputStreamBytesRead += bytesRead; if (backpressureFuture != null) { keepReading = false; - backpressureFuture.addListener(() -> keepReading = true, Execs.directExecutor()); + backpressureFuture.addListener( + () -> { + synchronized (readMonitor) { + keepReading = true; + readMonitor.notify(); + } + }, + Execs.directExecutor() + ); } else { keepReading = true; // continue adding data to delegate