Fixes #8623 - Use AutoLock in InputStreamResponseListener.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-09-27 12:39:20 +02:00
parent ab2a65f74f
commit 39bf8ff24d
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
1 changed files with 18 additions and 17 deletions

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -74,7 +75,7 @@ public class InputStreamResponseListener extends Listener.Adapter
private static final Logger LOG = LoggerFactory.getLogger(InputStreamResponseListener.class); private static final Logger LOG = LoggerFactory.getLogger(InputStreamResponseListener.class);
private static final Chunk EOF = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP); private static final Chunk EOF = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
private final Object lock = this; private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
private final CountDownLatch responseLatch = new CountDownLatch(1); private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1); private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicReference<InputStream> stream = new AtomicReference<>(); private final AtomicReference<InputStream> stream = new AtomicReference<>();
@ -91,7 +92,7 @@ public class InputStreamResponseListener extends Listener.Adapter
@Override @Override
public void onHeaders(Response response) public void onHeaders(Response response)
{ {
synchronized (lock) try (AutoLock ignored = lock.lock())
{ {
this.response = response; this.response = response;
responseLatch.countDown(); responseLatch.countDown();
@ -110,7 +111,7 @@ public class InputStreamResponseListener extends Listener.Adapter
} }
boolean closed; boolean closed;
synchronized (lock) try (AutoLock.WithCondition l = lock.lock())
{ {
closed = this.closed; closed = this.closed;
if (!closed) if (!closed)
@ -118,7 +119,7 @@ public class InputStreamResponseListener extends Listener.Adapter
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", content); LOG.debug("Queueing content {}", content);
chunks.add(new Chunk(content, callback)); chunks.add(new Chunk(content, callback));
lock.notifyAll(); l.signalAll();
} }
} }
@ -133,11 +134,11 @@ public class InputStreamResponseListener extends Listener.Adapter
@Override @Override
public void onSuccess(Response response) public void onSuccess(Response response)
{ {
synchronized (lock) try (AutoLock.WithCondition l = lock.lock())
{ {
if (!closed) if (!closed)
chunks.add(EOF); chunks.add(EOF);
lock.notifyAll(); l.signalAll();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -148,13 +149,13 @@ public class InputStreamResponseListener extends Listener.Adapter
public void onFailure(Response response, Throwable failure) public void onFailure(Response response, Throwable failure)
{ {
List<Callback> callbacks; List<Callback> callbacks;
synchronized (lock) try (AutoLock.WithCondition l = lock.lock())
{ {
if (this.failure != null) if (this.failure != null)
return; return;
this.failure = failure; this.failure = failure;
callbacks = drain(); callbacks = drain();
lock.notifyAll(); l.signalAll();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -168,7 +169,7 @@ public class InputStreamResponseListener extends Listener.Adapter
{ {
Throwable failure = result.getFailure(); Throwable failure = result.getFailure();
List<Callback> callbacks = Collections.emptyList(); List<Callback> callbacks = Collections.emptyList();
synchronized (lock) try (AutoLock.WithCondition l = lock.lock())
{ {
this.result = result; this.result = result;
if (result.isFailed() && this.failure == null) if (result.isFailed() && this.failure == null)
@ -179,7 +180,7 @@ public class InputStreamResponseListener extends Listener.Adapter
// Notify the response latch in case of request failures. // Notify the response latch in case of request failures.
responseLatch.countDown(); responseLatch.countDown();
resultLatch.countDown(); resultLatch.countDown();
lock.notifyAll(); l.signalAll();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -211,7 +212,7 @@ public class InputStreamResponseListener extends Listener.Adapter
boolean expired = !responseLatch.await(timeout, unit); boolean expired = !responseLatch.await(timeout, unit);
if (expired) if (expired)
throw new TimeoutException(); throw new TimeoutException();
synchronized (lock) try (AutoLock ignored = lock.lock())
{ {
// If the request failed there is no response. // If the request failed there is no response.
if (response == null) if (response == null)
@ -237,7 +238,7 @@ public class InputStreamResponseListener extends Listener.Adapter
boolean expired = !resultLatch.await(timeout, unit); boolean expired = !resultLatch.await(timeout, unit);
if (expired) if (expired)
throw new TimeoutException(); throw new TimeoutException();
synchronized (lock) try (AutoLock ignored = lock.lock())
{ {
return result; return result;
} }
@ -261,7 +262,7 @@ public class InputStreamResponseListener extends Listener.Adapter
private List<Callback> drain() private List<Callback> drain()
{ {
List<Callback> callbacks = new ArrayList<>(); List<Callback> callbacks = new ArrayList<>();
synchronized (lock) try (AutoLock ignored = lock.lock())
{ {
while (true) while (true)
{ {
@ -294,7 +295,7 @@ public class InputStreamResponseListener extends Listener.Adapter
{ {
int result; int result;
Callback callback = null; Callback callback = null;
synchronized (lock) try (AutoLock.WithCondition l = lock.lock())
{ {
Chunk chunk; Chunk chunk;
while (true) while (true)
@ -312,7 +313,7 @@ public class InputStreamResponseListener extends Listener.Adapter
if (closed) if (closed)
throw new AsynchronousCloseException(); throw new AsynchronousCloseException();
lock.wait(); l.await();
} }
ByteBuffer buffer = chunk.buffer; ByteBuffer buffer = chunk.buffer;
@ -346,13 +347,13 @@ public class InputStreamResponseListener extends Listener.Adapter
public void close() throws IOException public void close() throws IOException
{ {
List<Callback> callbacks; List<Callback> callbacks;
synchronized (lock) try (AutoLock.WithCondition l = lock.lock())
{ {
if (closed) if (closed)
return; return;
closed = true; closed = true;
callbacks = drain(); callbacks = drain();
lock.notifyAll(); l.signalAll();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())