diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java index 746a66e64be..38fabb47a29 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java @@ -165,6 +165,8 @@ public class RequestNotifier { // Slice the buffer to avoid that listeners peek into data they should not look at. content = content.slice(); + if (!content.hasRemaining()) + return; // Optimized to avoid allocations of iterator instances. List requestListeners = request.getRequestListeners(null); for (int i = 0; i < requestListeners.size(); ++i) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java index d8aa74c75d7..3c8a1e40aba 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java @@ -120,6 +120,8 @@ public class ResponseNotifier { // Slice the buffer to avoid that listeners peek into data they should not look at. buffer = buffer.slice(); + if (!buffer.hasRemaining()) + return; // Optimized to avoid allocations of iterator instances for (int i = 0; i < listeners.size(); ++i) { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java index 39844083d47..cd800d8ec3d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/InputStreamResponseListener.java @@ -108,25 +108,36 @@ public class InputStreamResponseListener extends Listener.Adapter @Override public void onContent(Response response, ByteBuffer content) { - // Avoid buffering if the input stream is early closed. - if (closed) - return; - - int remaining = content.remaining(); - byte[] bytes = new byte[remaining]; - content.get(bytes); - LOG.debug("Queuing {}/{} bytes", bytes, bytes.length); - queue.offer(bytes); - - long newLength = length.addAndGet(remaining); - while (newLength >= maxBufferSize) + if (!closed) { - LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize); - // Block to avoid infinite buffering - if (!await()) - break; - newLength = length.get(); - LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize); + int remaining = content.remaining(); + if (remaining > 0) + { + + byte[] bytes = new byte[remaining]; + content.get(bytes); + LOG.debug("Queuing {}/{} bytes", bytes, remaining); + queue.offer(bytes); + + long newLength = length.addAndGet(remaining); + while (newLength >= maxBufferSize) + { + LOG.debug("Queued bytes limit {}/{} exceeded, waiting", newLength, maxBufferSize); + // Block to avoid infinite buffering + if (!await()) + break; + newLength = length.get(); + LOG.debug("Queued bytes limit {}/{} exceeded, woken up", newLength, maxBufferSize); + } + } + else + { + LOG.debug("Queuing skipped, empty content {}", content); + } + } + else + { + LOG.debug("Queuing skipped, stream already closed"); } } @@ -305,11 +316,14 @@ public class InputStreamResponseListener extends Listener.Adapter @Override public void close() throws IOException { - LOG.debug("Queuing close {}{}", CLOSED, ""); - queue.offer(CLOSED); - closed = true; - signal(); - super.close(); + if (!closed) + { + super.close(); + LOG.debug("Queuing close {}{}", CLOSED, ""); + queue.offer(CLOSED); + closed = true; + signal(); + } } } }