From 39bf8ff24d6389c0434568dfe9fa7cf0d99d41ee Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 27 Sep 2022 12:39:20 +0200 Subject: [PATCH] Fixes #8623 - Use AutoLock in InputStreamResponseListener. Signed-off-by: Simone Bordet --- .../util/InputStreamResponseListener.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java index 429a2dd81d7..433e9ff6008 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java @@ -37,6 +37,7 @@ import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +75,7 @@ public class InputStreamResponseListener extends Listener.Adapter private static final Logger LOG = LoggerFactory.getLogger(InputStreamResponseListener.class); private static final Chunk EOF = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP); - private final Object lock = this; + private final AutoLock.WithCondition lock = new AutoLock.WithCondition(); private final CountDownLatch responseLatch = new CountDownLatch(1); private final CountDownLatch resultLatch = new CountDownLatch(1); private final AtomicReference stream = new AtomicReference<>(); @@ -91,7 +92,7 @@ public class InputStreamResponseListener extends Listener.Adapter @Override public void onHeaders(Response response) { - synchronized (lock) + try (AutoLock ignored = lock.lock()) { this.response = response; responseLatch.countDown(); @@ -110,7 +111,7 @@ public class InputStreamResponseListener extends Listener.Adapter } boolean closed; - synchronized (lock) + try (AutoLock.WithCondition l = lock.lock()) { closed = this.closed; if (!closed) @@ -118,7 +119,7 @@ public class InputStreamResponseListener extends Listener.Adapter if (LOG.isDebugEnabled()) LOG.debug("Queueing content {}", content); chunks.add(new Chunk(content, callback)); - lock.notifyAll(); + l.signalAll(); } } @@ -133,11 +134,11 @@ public class InputStreamResponseListener extends Listener.Adapter @Override public void onSuccess(Response response) { - synchronized (lock) + try (AutoLock.WithCondition l = lock.lock()) { if (!closed) chunks.add(EOF); - lock.notifyAll(); + l.signalAll(); } if (LOG.isDebugEnabled()) @@ -148,13 +149,13 @@ public class InputStreamResponseListener extends Listener.Adapter public void onFailure(Response response, Throwable failure) { List callbacks; - synchronized (lock) + try (AutoLock.WithCondition l = lock.lock()) { if (this.failure != null) return; this.failure = failure; callbacks = drain(); - lock.notifyAll(); + l.signalAll(); } if (LOG.isDebugEnabled()) @@ -168,7 +169,7 @@ public class InputStreamResponseListener extends Listener.Adapter { Throwable failure = result.getFailure(); List callbacks = Collections.emptyList(); - synchronized (lock) + try (AutoLock.WithCondition l = lock.lock()) { this.result = result; if (result.isFailed() && this.failure == null) @@ -179,7 +180,7 @@ public class InputStreamResponseListener extends Listener.Adapter // Notify the response latch in case of request failures. responseLatch.countDown(); resultLatch.countDown(); - lock.notifyAll(); + l.signalAll(); } if (LOG.isDebugEnabled()) @@ -211,7 +212,7 @@ public class InputStreamResponseListener extends Listener.Adapter boolean expired = !responseLatch.await(timeout, unit); if (expired) throw new TimeoutException(); - synchronized (lock) + try (AutoLock ignored = lock.lock()) { // If the request failed there is no response. if (response == null) @@ -237,7 +238,7 @@ public class InputStreamResponseListener extends Listener.Adapter boolean expired = !resultLatch.await(timeout, unit); if (expired) throw new TimeoutException(); - synchronized (lock) + try (AutoLock ignored = lock.lock()) { return result; } @@ -261,7 +262,7 @@ public class InputStreamResponseListener extends Listener.Adapter private List drain() { List callbacks = new ArrayList<>(); - synchronized (lock) + try (AutoLock ignored = lock.lock()) { while (true) { @@ -294,7 +295,7 @@ public class InputStreamResponseListener extends Listener.Adapter { int result; Callback callback = null; - synchronized (lock) + try (AutoLock.WithCondition l = lock.lock()) { Chunk chunk; while (true) @@ -312,7 +313,7 @@ public class InputStreamResponseListener extends Listener.Adapter if (closed) throw new AsynchronousCloseException(); - lock.wait(); + l.await(); } ByteBuffer buffer = chunk.buffer; @@ -346,13 +347,13 @@ public class InputStreamResponseListener extends Listener.Adapter public void close() throws IOException { List callbacks; - synchronized (lock) + try (AutoLock.WithCondition l = lock.lock()) { if (closed) return; closed = true; callbacks = drain(); - lock.notifyAll(); + l.signalAll(); } if (LOG.isDebugEnabled())