Merge pull request #8624 from eclipse/jetty-10.0.x-inputstreamresponselistener-autolock

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-09-27 12:45:38 +02:00 committed by GitHub
commit 3c35b2dce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 18 additions and 17 deletions

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,7 +75,7 @@ public class InputStreamResponseListener extends Listener.Adapter
private static final Logger LOG = LoggerFactory.getLogger(InputStreamResponseListener.class);
private static final Chunk EOF = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
private final Object lock = this;
private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicReference<InputStream> stream = new AtomicReference<>();
@ -91,7 +92,7 @@ public class InputStreamResponseListener extends Listener.Adapter
@Override
public void onHeaders(Response response)
{
synchronized (lock)
try (AutoLock ignored = lock.lock())
{
this.response = response;
responseLatch.countDown();
@ -110,7 +111,7 @@ public class InputStreamResponseListener extends Listener.Adapter
}
boolean closed;
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
closed = this.closed;
if (!closed)
@ -118,7 +119,7 @@ public class InputStreamResponseListener extends Listener.Adapter
if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", content);
chunks.add(new Chunk(content, callback));
lock.notifyAll();
l.signalAll();
}
}
@ -133,11 +134,11 @@ public class InputStreamResponseListener extends Listener.Adapter
@Override
public void onSuccess(Response response)
{
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
if (!closed)
chunks.add(EOF);
lock.notifyAll();
l.signalAll();
}
if (LOG.isDebugEnabled())
@ -148,13 +149,13 @@ public class InputStreamResponseListener extends Listener.Adapter
public void onFailure(Response response, Throwable failure)
{
List<Callback> callbacks;
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
if (this.failure != null)
return;
this.failure = failure;
callbacks = drain();
lock.notifyAll();
l.signalAll();
}
if (LOG.isDebugEnabled())
@ -168,7 +169,7 @@ public class InputStreamResponseListener extends Listener.Adapter
{
Throwable failure = result.getFailure();
List<Callback> callbacks = Collections.emptyList();
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
this.result = result;
if (result.isFailed() && this.failure == null)
@ -179,7 +180,7 @@ public class InputStreamResponseListener extends Listener.Adapter
// Notify the response latch in case of request failures.
responseLatch.countDown();
resultLatch.countDown();
lock.notifyAll();
l.signalAll();
}
if (LOG.isDebugEnabled())
@ -211,7 +212,7 @@ public class InputStreamResponseListener extends Listener.Adapter
boolean expired = !responseLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
synchronized (lock)
try (AutoLock ignored = lock.lock())
{
// If the request failed there is no response.
if (response == null)
@ -237,7 +238,7 @@ public class InputStreamResponseListener extends Listener.Adapter
boolean expired = !resultLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
synchronized (lock)
try (AutoLock ignored = lock.lock())
{
return result;
}
@ -261,7 +262,7 @@ public class InputStreamResponseListener extends Listener.Adapter
private List<Callback> drain()
{
List<Callback> callbacks = new ArrayList<>();
synchronized (lock)
try (AutoLock ignored = lock.lock())
{
while (true)
{
@ -294,7 +295,7 @@ public class InputStreamResponseListener extends Listener.Adapter
{
int result;
Callback callback = null;
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
Chunk chunk;
while (true)
@ -312,7 +313,7 @@ public class InputStreamResponseListener extends Listener.Adapter
if (closed)
throw new AsynchronousCloseException();
lock.wait();
l.await();
}
ByteBuffer buffer = chunk.buffer;
@ -346,13 +347,13 @@ public class InputStreamResponseListener extends Listener.Adapter
public void close() throws IOException
{
List<Callback> callbacks;
synchronized (lock)
try (AutoLock.WithCondition l = lock.lock())
{
if (closed)
return;
closed = true;
callbacks = drain();
lock.notifyAll();
l.signalAll();
}
if (LOG.isDebugEnabled())