diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java index d804e9d1cbb..f06302b4920 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java @@ -58,6 +58,8 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel private volatile boolean keepReading = true; + private final Object readMonitor = new Object(); + private final ExecutorService executorService; /** @@ -152,7 +154,11 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel while (true) { if (!keepReading) { try { - Thread.sleep(nextRetrySleepMillis(nTry)); + synchronized (readMonitor) { + if (!keepReading) { + readMonitor.wait(nextRetrySleepMillis(nTry)); + } + } synchronized (lock) { if (inputStreamFinished || inputStreamError || delegate.isErrorOrFinished()) { return; @@ -186,7 +192,15 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel totalInputStreamBytesRead += bytesRead; if (backpressureFuture != null) { keepReading = false; - backpressureFuture.addListener(() -> keepReading = true, Execs.directExecutor()); + backpressureFuture.addListener( + () -> { + synchronized (readMonitor) { + keepReading = true; + readMonitor.notify(); + } + }, + Execs.directExecutor() + ); } else { keepReading = true; // continue adding data to delegate