mirror of https://github.com/apache/druid.git
Improve performance for ReadableInputStreamFrameChannel (#13373)
* Improve performance for ReadableInputStreamFrameChannel * Fix race condition leading to unnecessary sleep
This commit is contained in:
parent
092e769dd8
commit
5b625cea96
|
@ -58,6 +58,8 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
|
||||||
|
|
||||||
private volatile boolean keepReading = true;
|
private volatile boolean keepReading = true;
|
||||||
|
|
||||||
|
private final Object readMonitor = new Object();
|
||||||
|
|
||||||
private final ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -152,7 +154,11 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!keepReading) {
|
if (!keepReading) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(nextRetrySleepMillis(nTry));
|
synchronized (readMonitor) {
|
||||||
|
if (!keepReading) {
|
||||||
|
readMonitor.wait(nextRetrySleepMillis(nTry));
|
||||||
|
}
|
||||||
|
}
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (inputStreamFinished || inputStreamError || delegate.isErrorOrFinished()) {
|
if (inputStreamFinished || inputStreamError || delegate.isErrorOrFinished()) {
|
||||||
return;
|
return;
|
||||||
|
@ -186,7 +192,15 @@ public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
|
||||||
totalInputStreamBytesRead += bytesRead;
|
totalInputStreamBytesRead += bytesRead;
|
||||||
if (backpressureFuture != null) {
|
if (backpressureFuture != null) {
|
||||||
keepReading = false;
|
keepReading = false;
|
||||||
backpressureFuture.addListener(() -> keepReading = true, Execs.directExecutor());
|
backpressureFuture.addListener(
|
||||||
|
() -> {
|
||||||
|
synchronized (readMonitor) {
|
||||||
|
keepReading = true;
|
||||||
|
readMonitor.notify();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Execs.directExecutor()
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
keepReading = true;
|
keepReading = true;
|
||||||
// continue adding data to delegate
|
// continue adding data to delegate
|
||||||
|
|
Loading…
Reference in New Issue