From 057575f1cb8fd2c0c35e8eeeb26b726c1d4cea99 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 26 Feb 2019 16:32:37 +0100 Subject: [PATCH] Issue #3373 - OutOfMemoryError: Java heap space in GZIPContentDecoder. Modified jetty-client content decoding to be fully non-blocking; this allows for a better backpressure and less usage of the buffer pool. Modified GZIPContentDecoder to aggregate decoded ByteBuffers in a smarter way that avoids too many data copies and pollution of the buffer pool with intermediate size buffers. Removed duplicate test GZIPContentDecoderTest. Improved javadocs and improved AsyncMiddleManServlet to release buffers used by the GZIPContentDecoder. Signed-off-by: Simone Bordet --- .../eclipse/jetty/client/ContentDecoder.java | 9 + .../jetty/client/GZIPContentDecoder.java | 11 +- .../eclipse/jetty/client/HttpReceiver.java | 76 +++-- .../jetty/client/GZIPContentDecoderTest.java | 288 ------------------ .../jetty/client/HttpClientGZIPTest.java | 84 +++-- .../jetty/http/GZIPContentDecoder.java | 178 ++++++----- .../jetty/http/GZIPContentDecoderTest.java | 79 +++-- .../jetty/proxy/AsyncMiddleManServlet.java | 37 ++- .../proxy/AsyncMiddleManServletTest.java | 178 +++++------ 9 files changed, 371 insertions(+), 569 deletions(-) delete mode 100644 jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ContentDecoder.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ContentDecoder.java index 471823f3d7f..c4449b703f8 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ContentDecoder.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ContentDecoder.java @@ -35,6 +35,15 @@ public interface ContentDecoder */ public abstract ByteBuffer decode(ByteBuffer buffer); + /** + *

Releases the ByteBuffer returned by {@link #decode(ByteBuffer)}.

+ * + * @param decoded the ByteBuffer returned by {@link #decode(ByteBuffer)} + */ + public default void release(ByteBuffer decoded) + { + } + /** * Factory for {@link ContentDecoder}s; subclasses must implement {@link #newContentDecoder()}. *

diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java b/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java index 857d634f044..7a02c1b4f04 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java @@ -18,6 +18,8 @@ package org.eclipse.jetty.client; +import java.nio.ByteBuffer; + import org.eclipse.jetty.io.ByteBufferPool; /** @@ -25,7 +27,7 @@ import org.eclipse.jetty.io.ByteBufferPool; */ public class GZIPContentDecoder extends org.eclipse.jetty.http.GZIPContentDecoder implements ContentDecoder { - private static final int DEFAULT_BUFFER_SIZE = 2048; + public static final int DEFAULT_BUFFER_SIZE = 8192; public GZIPContentDecoder() { @@ -42,6 +44,13 @@ public class GZIPContentDecoder extends org.eclipse.jetty.http.GZIPContentDecode super(byteBufferPool, bufferSize); } + @Override + protected boolean decodedChunk(ByteBuffer chunk) + { + super.decodedChunk(chunk); + return true; + } + /** * Specialized {@link ContentDecoder.Factory} for the "gzip" encoding. */ diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 7690791347b..63fe07ef5d2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -21,7 +21,6 @@ package org.eclipse.jetty.client; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; @@ -37,7 +36,7 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.CountingCallback; +import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.component.Destroyable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -339,35 +338,7 @@ public abstract class HttpReceiver } else { - try - { - List decodeds = new ArrayList<>(2); - while (buffer.hasRemaining()) - { - ByteBuffer decoded = decoder.decode(buffer); - if (!decoded.hasRemaining()) - continue; - decodeds.add(decoded); - if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); - } - - if (decodeds.isEmpty()) - { - callback.succeeded(); - } - else - { - int size = decodeds.size(); - CountingCallback counter = new CountingCallback(callback, size); - for (ByteBuffer decoded : decodeds) - notifier.notifyContent(response, decoded, counter, contentListeners); - } - } - catch (Throwable x) - { - callback.failed(x); - } + new Decoder(notifier, response, decoder, buffer, callback).iterate(); } if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) @@ -615,4 +586,47 @@ public abstract class HttpReceiver */ FAILURE } + + private class Decoder extends IteratingNestedCallback + { + private final ResponseNotifier notifier; + private final HttpResponse response; + private final ContentDecoder decoder; + private final ByteBuffer buffer; + private ByteBuffer decoded; + + public Decoder(ResponseNotifier notifier, HttpResponse response, ContentDecoder decoder, ByteBuffer buffer, Callback callback) + { + super(callback); + this.notifier = notifier; + this.response = response; + this.decoder = decoder; + this.buffer = buffer; + } + + @Override + protected Action process() throws Throwable + { + while (true) + { + decoded = decoder.decode(buffer); + if (decoded.hasRemaining()) + break; + if (!buffer.hasRemaining()) + return Action.SUCCEEDED; + } + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); + + notifier.notifyContent(response, decoded, this, contentListeners); + return Action.SCHEDULED; + } + + @Override + public void succeeded() + { + decoder.release(decoded); + super.succeeded(); + } + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java deleted file mode 100644 index c627ba84547..00000000000 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java +++ /dev/null @@ -1,288 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.client; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -import org.junit.jupiter.api.Test; - -@Deprecated -public class GZIPContentDecoderTest -{ - @Test - public void testStreamNoBlocks() throws Exception - { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.close(); - byte[] bytes = baos.toByteArray(); - - GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1); - int read = input.read(); - assertEquals(-1, read); - } - - @Test - public void testStreamBigBlockOneByteAtATime() throws Exception - { - String data = "0123456789ABCDEF"; - for (int i = 0; i < 10; ++i) - data += data; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes = baos.toByteArray(); - - baos = new ByteArrayOutputStream(); - GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1); - int read; - while ((read = input.read()) >= 0) - baos.write(read); - assertEquals(data, new String(baos.toByteArray(), StandardCharsets.UTF_8)); - } - - @Test - public void testNoBlocks() throws Exception - { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.close(); - byte[] bytes = baos.toByteArray(); - - GZIPContentDecoder decoder = new GZIPContentDecoder(); - ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes)); - assertEquals(0, decoded.remaining()); - } - - @Test - public void testSmallBlock() throws Exception - { - String data = "0"; - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes = baos.toByteArray(); - - GZIPContentDecoder decoder = new GZIPContentDecoder(); - ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes)); - assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); - } - - @Test - public void testSmallBlockWithGZIPChunkedAtBegin() throws Exception - { - String data = "0"; - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes = baos.toByteArray(); - - // The header is 10 bytes, chunk at 11 bytes - byte[] bytes1 = new byte[11]; - System.arraycopy(bytes, 0, bytes1, 0, bytes1.length); - byte[] bytes2 = new byte[bytes.length - bytes1.length]; - System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); - - GZIPContentDecoder decoder = new GZIPContentDecoder(); - ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); - assertEquals(0, decoded.capacity()); - decoded = decoder.decode(ByteBuffer.wrap(bytes2)); - assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); - } - - @Test - public void testSmallBlockWithGZIPChunkedAtEnd() throws Exception - { - String data = "0"; - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes = baos.toByteArray(); - - // The trailer is 8 bytes, chunk the last 9 bytes - byte[] bytes1 = new byte[bytes.length - 9]; - System.arraycopy(bytes, 0, bytes1, 0, bytes1.length); - byte[] bytes2 = new byte[bytes.length - bytes1.length]; - System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); - - GZIPContentDecoder decoder = new GZIPContentDecoder(); - ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); - assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); - assertFalse(decoder.isFinished()); - decoded = decoder.decode(ByteBuffer.wrap(bytes2)); - assertEquals(0, decoded.remaining()); - assertTrue(decoder.isFinished()); - } - - @Test - public void testSmallBlockWithGZIPTrailerChunked() throws Exception - { - String data = "0"; - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes = baos.toByteArray(); - - // The trailer is 4+4 bytes, chunk the last 3 bytes - byte[] bytes1 = new byte[bytes.length - 3]; - System.arraycopy(bytes, 0, bytes1, 0, bytes1.length); - byte[] bytes2 = new byte[bytes.length - bytes1.length]; - System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); - - GZIPContentDecoder decoder = new GZIPContentDecoder(); - ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); - assertEquals(0, decoded.capacity()); - decoded = decoder.decode(ByteBuffer.wrap(bytes2)); - assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); - } - - @Test - public void testTwoSmallBlocks() throws Exception - { - String data1 = "0"; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data1.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes1 = baos.toByteArray(); - - String data2 = "1"; - baos = new ByteArrayOutputStream(); - output = new GZIPOutputStream(baos); - output.write(data2.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes2 = baos.toByteArray(); - - byte[] bytes = new byte[bytes1.length + bytes2.length]; - System.arraycopy(bytes1, 0, bytes, 0, bytes1.length); - System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length); - - GZIPContentDecoder decoder = new GZIPContentDecoder(); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - ByteBuffer decoded = decoder.decode(buffer); - assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString()); - assertTrue(decoder.isFinished()); - assertTrue(buffer.hasRemaining()); - decoded = decoder.decode(buffer); - assertEquals(data2, StandardCharsets.UTF_8.decode(decoded).toString()); - assertTrue(decoder.isFinished()); - assertFalse(buffer.hasRemaining()); - } - - @Test - public void testBigBlock() throws Exception - { - String data = "0123456789ABCDEF"; - for (int i = 0; i < 10; ++i) - data += data; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes = baos.toByteArray(); - - String result = ""; - GZIPContentDecoder decoder = new GZIPContentDecoder(); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - while (buffer.hasRemaining()) - { - ByteBuffer decoded = decoder.decode(buffer); - result += StandardCharsets.UTF_8.decode(decoded).toString(); - } - assertEquals(data, result); - } - - @Test - public void testBigBlockOneByteAtATime() throws Exception - { - String data = "0123456789ABCDEF"; - for (int i = 0; i < 10; ++i) - data += data; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes = baos.toByteArray(); - - String result = ""; - GZIPContentDecoder decoder = new GZIPContentDecoder(64); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - while (buffer.hasRemaining()) - { - ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(new byte[]{buffer.get()})); - if (decoded.hasRemaining()) - result += StandardCharsets.UTF_8.decode(decoded).toString(); - } - assertEquals(data, result); - assertTrue(decoder.isFinished()); - } - - @Test - public void testBigBlockWithExtraBytes() throws Exception - { - String data1 = "0123456789ABCDEF"; - for (int i = 0; i < 10; ++i) - data1 += data1; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - GZIPOutputStream output = new GZIPOutputStream(baos); - output.write(data1.getBytes(StandardCharsets.UTF_8)); - output.close(); - byte[] bytes1 = baos.toByteArray(); - - String data2 = "HELLO"; - byte[] bytes2 = data2.getBytes(StandardCharsets.UTF_8); - - byte[] bytes = new byte[bytes1.length + bytes2.length]; - System.arraycopy(bytes1, 0, bytes, 0, bytes1.length); - System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length); - - String result = ""; - GZIPContentDecoder decoder = new GZIPContentDecoder(64); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - while (buffer.hasRemaining()) - { - ByteBuffer decoded = decoder.decode(buffer); - if (decoded.hasRemaining()) - result += StandardCharsets.UTF_8.decode(decoded).toString(); - if (decoder.isFinished()) - break; - } - assertEquals(data1, result); - assertTrue(buffer.hasRemaining()); - assertEquals(data2, StandardCharsets.UTF_8.decode(buffer).toString()); - } -} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java index 0e47e96617b..acd6232cec0 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java @@ -18,29 +18,36 @@ package org.eclipse.jetty.client; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InterruptedIOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPOutputStream; -import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + public class HttpClientGZIPTest extends AbstractHttpClientServerTest { @ParameterizedTest @@ -48,12 +55,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest public void testGZIPContentEncoding(Scenario scenario) throws Exception { final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - start(scenario, new AbstractHandler() + start(scenario, new EmptyServerHandler() { @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { - baseRequest.setHandled(true); response.setHeader("Content-Encoding", "gzip"); GZIPOutputStream gzipOutput = new GZIPOutputStream(response.getOutputStream()); gzipOutput.write(data); @@ -75,12 +81,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest public void testGZIPContentOneByteAtATime(Scenario scenario) throws Exception { final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - start(scenario, new AbstractHandler() + start(scenario, new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { - baseRequest.setHandled(true); response.setHeader("Content-Encoding", "gzip"); ByteArrayOutputStream gzipData = new ByteArrayOutputStream(); @@ -112,12 +117,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest public void testGZIPContentSentTwiceInOneWrite(Scenario scenario) throws Exception { final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - start(scenario, new AbstractHandler() + start(scenario, new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { - baseRequest.setHandled(true); response.setHeader("Content-Encoding", "gzip"); ByteArrayOutputStream gzipData = new ByteArrayOutputStream(); @@ -164,12 +168,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest private void testGZIPContentFragmented(Scenario scenario, final int fragment) throws Exception { final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - start(scenario, new AbstractHandler() + start(scenario, new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { - baseRequest.setHandled(true); response.setHeader("Content-Encoding", "gzip"); ByteArrayOutputStream gzipData = new ByteArrayOutputStream(); @@ -204,12 +207,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest @ArgumentsSource(ScenarioProvider.class) public void testGZIPContentCorrupted(Scenario scenario) throws Exception { - start(scenario, new AbstractHandler() + start(scenario, new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { - baseRequest.setHandled(true); response.setHeader("Content-Encoding", "gzip"); // Not gzipped, will cause the client to blow up. response.getOutputStream().print("0123456789"); @@ -228,6 +230,46 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testLargeGZIPContentDoesNotPolluteByteBufferPool(Scenario scenario) throws Exception + { + String digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + Random random = new Random(); + byte[] content = new byte[1024 * 1024]; + for (int i = 0; i < content.length; ++i) + content[i] = (byte)digits.charAt(random.nextInt(digits.length())); + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.setContentType("text/plain;charset=" + StandardCharsets.US_ASCII.name()); + response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); + GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()); + gzip.write(content); + gzip.finish(); + } + }); + + ByteBufferPool pool = client.getByteBufferPool(); + assumeTrue(pool instanceof MappedByteBufferPool); + MappedByteBufferPool bufferPool = (MappedByteBufferPool)pool; + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .timeout(5, TimeUnit.SECONDS) + .send(); + + assertEquals(HttpStatus.OK_200, response.getStatus()); + assertArrayEquals(content, response.getContent()); + + long directMemory = bufferPool.getMemory(true); + assertThat(directMemory, lessThan((long)content.length)); + long heapMemory = bufferPool.getMemory(false); + assertThat(heapMemory, lessThan((long)content.length)); + } + private static void sleep(long ms) throws IOException { try diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java b/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java index e56bab22639..6ca334aba29 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java @@ -18,7 +18,10 @@ package org.eclipse.jetty.http; +import java.nio.Buffer; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.zip.DataFormatException; import java.util.zip.Inflater; import java.util.zip.ZipException; @@ -28,13 +31,13 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.component.Destroyable; /** - * Decoder for the "gzip" encoding. - *

- * A decoder that inflates gzip compressed data that has been - * optimized for async usage with minimal data copies. + *

Decoder for the "gzip" content encoding.

+ *

This decoder inflates gzip compressed data, and has + * been optimized for async usage with minimal data copies.

*/ public class GZIPContentDecoder implements Destroyable { + private final List _inflateds = new ArrayList<>(); private final Inflater _inflater = new Inflater(true); private final ByteBufferPool _pool; private final int _bufferSize; @@ -46,14 +49,14 @@ public class GZIPContentDecoder implements Destroyable public GZIPContentDecoder() { - this(null,2048); + this(null, 2048); } public GZIPContentDecoder(int bufferSize) { - this(null,bufferSize); + this(null, bufferSize); } - + public GZIPContentDecoder(ByteBufferPool pool, int bufferSize) { _bufferSize = bufferSize; @@ -61,68 +64,95 @@ public class GZIPContentDecoder implements Destroyable reset(); } - /** Inflate compressed data from a buffer. - * - * @param compressed Buffer containing compressed data. - * @return Buffer containing inflated data. + /** + *

Inflates compressed data from a buffer.

+ *

The buffers returned by this method should be released + * via {@link #release(ByteBuffer)}.

+ *

This method may fully consume the input buffer, but return + * only a chunk of the inflated bytes, to allow applications to + * consume the inflated chunk before performing further inflation, + * applying backpressure. In this case, this method should be + * invoked again with the same input buffer (even if + * it's already fully consumed) and that will produce another + * chunk of inflated bytes. Termination happens when the input + * buffer is fully consumed, and the returned buffer is empty.

+ *

See {@link #decodedChunk(ByteBuffer)} to perform inflating + * in a non-blocking way that allows to apply backpressure.

+ * + * @param compressed the buffer containing compressed data. + * @return a buffer containing inflated data. */ public ByteBuffer decode(ByteBuffer compressed) { decodeChunks(compressed); - if (BufferUtil.isEmpty(_inflated) || _state==State.CRC || _state==State.ISIZE ) - return BufferUtil.EMPTY_BUFFER; - - ByteBuffer result = _inflated; - _inflated = null; - return result; - } - /** Called when a chunk of data is inflated. - *

The default implementation aggregates all the chunks - * into a single buffer returned from {@link #decode(ByteBuffer)}. - * Derived implementations may choose to consume chunks individually - * and return false to prevent further inflation until a subsequent - * call to {@link #decode(ByteBuffer)} or {@link #decodeChunks(ByteBuffer)}. - * - * @param chunk The inflated chunk of data - * @return False if inflating should continue, or True if the call - * to {@link #decodeChunks(ByteBuffer)} or {@link #decode(ByteBuffer)} - * should return, allowing back pressure of compressed data. - */ - protected boolean decodedChunk(ByteBuffer chunk) - { - if (_inflated==null) + if (_inflateds.isEmpty()) { - _inflated=chunk; + if (BufferUtil.isEmpty(_inflated) || _state == State.CRC || _state == State.ISIZE) + return BufferUtil.EMPTY_BUFFER; + ByteBuffer result = _inflated; + _inflated = null; + return result; } else { - int size = _inflated.remaining() + chunk.remaining(); - if (size<=_inflated.capacity()) + _inflateds.add(_inflated); + _inflated = null; + int length = _inflateds.stream().mapToInt(Buffer::remaining).sum(); + ByteBuffer result = acquire(length); + for (ByteBuffer buffer : _inflateds) { - BufferUtil.append(_inflated,chunk); + BufferUtil.append(result, buffer); + release(buffer); + } + _inflateds.clear(); + return result; + } + } + + /** + *

Called when a chunk of data is inflated.

+ *

The default implementation aggregates all the chunks + * into a single buffer returned from {@link #decode(ByteBuffer)}.

+ *

Derived implementations may choose to consume inflated chunks + * individually and return {@code true} from this method to prevent + * further inflation until a subsequent call to {@link #decode(ByteBuffer)} + * or {@link #decodeChunks(ByteBuffer)} is made. + * + * @param chunk the inflated chunk of data + * @return false if inflating should continue, or true if the call + * to {@link #decodeChunks(ByteBuffer)} or {@link #decode(ByteBuffer)} + * should return, allowing to consume the inflated chunk and apply + * backpressure + */ + protected boolean decodedChunk(ByteBuffer chunk) + { + if (_inflated == null) + { + _inflated = chunk; + } + else + { + if (BufferUtil.space(_inflated) >= chunk.remaining()) + { + BufferUtil.append(_inflated, chunk); release(chunk); } else { - ByteBuffer bigger=acquire(size); - int pos=BufferUtil.flipToFill(bigger); - BufferUtil.put(_inflated,bigger); - BufferUtil.put(chunk,bigger); - BufferUtil.flipToFlush(bigger,pos); - release(_inflated); - release(chunk); - _inflated = bigger; + _inflateds.add(_inflated); + _inflated = chunk; } } return false; } /** - * Inflate compressed data. - *

Inflation continues until the compressed block end is reached, there is no - * more compressed data or a call to {@link #decodedChunk(ByteBuffer)} returns true. - * @param compressed Buffer of compressed data to inflate + *

Inflates compressed data.

+ *

Inflation continues until the compressed block end is reached, there is no + * more compressed data or a call to {@link #decodedChunk(ByteBuffer)} returns true.

+ * + * @param compressed the buffer of compressed data to inflate */ protected void decodeChunks(ByteBuffer compressed) { @@ -164,24 +194,24 @@ public class GZIPContentDecoder implements Destroyable } break; } - + case DATA: { while (true) { - if (buffer==null) + if (buffer == null) buffer = acquire(_bufferSize); - + try { - int length = _inflater.inflate(buffer.array(),buffer.arrayOffset(),buffer.capacity()); + int length = _inflater.inflate(buffer.array(), buffer.arrayOffset(), buffer.capacity()); buffer.limit(length); } catch (DataFormatException x) { throw new ZipException(x.getMessage()); } - + if (buffer.hasRemaining()) { ByteBuffer chunk = buffer; @@ -195,7 +225,7 @@ public class GZIPContentDecoder implements Destroyable return; if (compressed.hasArray()) { - _inflater.setInput(compressed.array(),compressed.arrayOffset()+compressed.position(),compressed.remaining()); + _inflater.setInput(compressed.array(), compressed.arrayOffset() + compressed.position(), compressed.remaining()); compressed.position(compressed.limit()); } else @@ -204,7 +234,7 @@ public class GZIPContentDecoder implements Destroyable byte[] input = new byte[compressed.remaining()]; compressed.get(input); _inflater.setInput(input); - } + } } else if (_inflater.finished()) { @@ -218,14 +248,14 @@ public class GZIPContentDecoder implements Destroyable } continue; } - + default: break; } - + if (!compressed.hasRemaining()) break; - + byte currByte = compressed.get(); switch (_state) { @@ -354,7 +384,7 @@ public class GZIPContentDecoder implements Destroyable // TODO ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output); reset(); - return ; + return; } break; } @@ -369,7 +399,7 @@ public class GZIPContentDecoder implements Destroyable } finally { - if (buffer!=null) + if (buffer != null) release(buffer); } } @@ -398,28 +428,28 @@ public class GZIPContentDecoder implements Destroyable { INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE } - + /** - * @param capacity capacity capacity of the allocated ByteBuffer - * @return An indirect buffer of the configured buffersize either from the pool or freshly allocated. + * @param capacity capacity of the ByteBuffer to acquire + * @return a heap buffer of the configured capacity either from the pool or freshly allocated. */ public ByteBuffer acquire(int capacity) { - return _pool==null?BufferUtil.allocate(capacity):_pool.acquire(capacity,false); + return _pool == null ? BufferUtil.allocate(capacity) : _pool.acquire(capacity, false); } - + /** - * Release an allocated buffer. - *

This method will called {@link ByteBufferPool#release(ByteBuffer)} if a buffer pool has - * been configured. This method should be called once for all buffers returned from {@link #decode(ByteBuffer)} - * or passed to {@link #decodedChunk(ByteBuffer)}. - * @param buffer The buffer to release. + *

Releases an allocated buffer.

+ *

This method calls {@link ByteBufferPool#release(ByteBuffer)} if a buffer pool has + * been configured.

+ *

This method should be called once for all buffers returned from {@link #decode(ByteBuffer)} + * or passed to {@link #decodedChunk(ByteBuffer)}.

+ * + * @param buffer the buffer to release. */ public void release(ByteBuffer buffer) { - @SuppressWarnings("ReferenceEquality") - boolean isTheEmptyBuffer = (buffer==BufferUtil.EMPTY_BUFFER); - if (_pool!=null && !isTheEmptyBuffer) + if (_pool != null && !BufferUtil.isTheEmptyBuffer(buffer)) _pool.release(buffer); } } diff --git a/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java b/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java index b5dea752126..46b22aa36b3 100644 --- a/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java +++ b/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java @@ -18,10 +18,6 @@ package org.eclipse.jetty.http; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; @@ -35,52 +31,55 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class GZIPContentDecoderTest { - ArrayByteBufferPool pool; - AtomicInteger buffers = new AtomicInteger(0); - + private ArrayByteBufferPool pool; + private AtomicInteger buffers = new AtomicInteger(0); + @BeforeEach - public void beforeClass() throws Exception + public void before() { buffers.set(0); pool = new ArrayByteBufferPool() + { + + @Override + public ByteBuffer acquire(int size, boolean direct) { + buffers.incrementAndGet(); + return super.acquire(size, direct); + } - @Override - public ByteBuffer acquire(int size, boolean direct) - { - buffers.incrementAndGet(); - return super.acquire(size,direct); - } + @Override + public void release(ByteBuffer buffer) + { + buffers.decrementAndGet(); + super.release(buffer); + } - @Override - public void release(ByteBuffer buffer) - { - buffers.decrementAndGet(); - super.release(buffer); - } - - }; + }; } - + @AfterEach - public void afterClass() throws Exception + public void after() { - assertEquals(0,buffers.get()); + assertEquals(0, buffers.get()); } - + @Test - public void testCompresedContentFormat() throws Exception + public void testCompressedContentFormat() { - assertTrue(CompressedContentFormat.tagEquals("tag","tag")); - assertTrue(CompressedContentFormat.tagEquals("\"tag\"","\"tag\"")); - assertTrue(CompressedContentFormat.tagEquals("\"tag\"","\"tag--gzip\"")); - assertFalse(CompressedContentFormat.tagEquals("Zag","Xag--gzip")); - assertFalse(CompressedContentFormat.tagEquals("xtag","tag")); + assertTrue(CompressedContentFormat.tagEquals("tag", "tag")); + assertTrue(CompressedContentFormat.tagEquals("\"tag\"", "\"tag\"")); + assertTrue(CompressedContentFormat.tagEquals("\"tag\"", "\"tag--gzip\"")); + assertFalse(CompressedContentFormat.tagEquals("Zag", "Xag--gzip")); + assertFalse(CompressedContentFormat.tagEquals("xtag", "tag")); } - + @Test public void testStreamNoBlocks() throws Exception { @@ -122,7 +121,7 @@ public class GZIPContentDecoderTest output.close(); byte[] bytes = baos.toByteArray(); - GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048); ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes)); assertEquals(0, decoded.remaining()); } @@ -138,7 +137,7 @@ public class GZIPContentDecoderTest output.close(); byte[] bytes = baos.toByteArray(); - GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048); ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes)); assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); decoder.release(decoded); @@ -161,7 +160,7 @@ public class GZIPContentDecoderTest byte[] bytes2 = new byte[bytes.length - bytes1.length]; System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); - GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048); ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); assertEquals(0, decoded.capacity()); decoded = decoder.decode(ByteBuffer.wrap(bytes2)); @@ -186,7 +185,7 @@ public class GZIPContentDecoderTest byte[] bytes2 = new byte[bytes.length - bytes1.length]; System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); - GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048); ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); assertFalse(decoder.isFinished()); @@ -214,7 +213,7 @@ public class GZIPContentDecoderTest byte[] bytes2 = new byte[bytes.length - bytes1.length]; System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); - GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048); ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); assertEquals(0, decoded.capacity()); decoder.release(decoded); @@ -244,7 +243,7 @@ public class GZIPContentDecoderTest System.arraycopy(bytes1, 0, bytes, 0, bytes1.length); System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length); - GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048); ByteBuffer buffer = ByteBuffer.wrap(bytes); ByteBuffer decoded = decoder.decode(buffer); assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString()); @@ -271,7 +270,7 @@ public class GZIPContentDecoderTest byte[] bytes = baos.toByteArray(); String result = ""; - GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048); ByteBuffer buffer = ByteBuffer.wrap(bytes); while (buffer.hasRemaining()) { diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java index f1e29d97568..da71e6c32fe 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.TimeUnit; @@ -41,12 +42,14 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.ContentDecoder; import org.eclipse.jetty.client.GZIPContentDecoder; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -275,7 +278,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet } @Override - public void onDataAvailable() throws IOException + public void onDataAvailable() { iterate(); } @@ -370,9 +373,8 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet if (size > 0) { CountingCallback counter = new CountingCallback(callback, size); - for (int i = 0; i < size; ++i) + for (ByteBuffer buffer : buffers) { - ByteBuffer buffer = buffers.get(i); newContentBytes += buffer.remaining(); provider.offer(buffer, counter); } @@ -476,9 +478,8 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet if (size > 0) { Callback counter = size == 1 ? callback : new CountingCallback(callback, size); - for (int i = 0; i < size; ++i) + for (ByteBuffer buffer : buffers) { - ByteBuffer buffer = buffers.get(i); newContentBytes += buffer.remaining(); proxyWriter.offer(buffer, counter); } @@ -540,9 +541,8 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet if (size > 0) { Callback callback = size == 1 ? complete : new CountingCallback(complete, size); - for (int i = 0; i < size; ++i) + for (ByteBuffer buffer : buffers) { - ByteBuffer buffer = buffers.get(i); newContentBytes += buffer.remaining(); proxyWriter.offer(buffer, callback); } @@ -686,7 +686,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet /** *

Allows applications to transform upstream and downstream content.

*

Typical use cases of transformations are URL rewriting of HTML anchors - * (where the value of the href attribute of <a> elements + * (where the value of the {@code href} attribute of <a> elements * is modified by the proxy), field renaming of JSON documents, etc.

*

Applications should override {@link #newClientRequestContentTransformer(HttpServletRequest, Request)} * and/or {@link #newServerResponseContentTransformer(HttpServletRequest, HttpServletResponse, Response)} @@ -762,16 +762,23 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet private static final Logger logger = Log.getLogger(GZIPContentTransformer.class); private final List buffers = new ArrayList<>(2); - private final ContentDecoder decoder = new GZIPContentDecoder(); private final ContentTransformer transformer; + private final ContentDecoder decoder; private final ByteArrayOutputStream out; private final GZIPOutputStream gzipOut; public GZIPContentTransformer(ContentTransformer transformer) + { + this(null, transformer); + } + + public GZIPContentTransformer(HttpClient httpClient, ContentTransformer transformer) { try { this.transformer = transformer; + ByteBufferPool byteBufferPool = httpClient == null ? null : httpClient.getByteBufferPool(); + this.decoder = new GZIPContentDecoder(byteBufferPool, GZIPContentDecoder.DEFAULT_BUFFER_SIZE); this.out = new ByteArrayOutputStream(); this.gzipOut = new GZIPOutputStream(out); } @@ -787,6 +794,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet if (logger.isDebugEnabled()) logger.debug("Ungzipping {} bytes, finished={}", input.remaining(), finished); + List decodeds = Collections.emptyList(); if (!input.hasRemaining()) { if (finished) @@ -794,14 +802,19 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet } else { - while (input.hasRemaining()) + decodeds = new ArrayList<>(); + while (true) { ByteBuffer decoded = decoder.decode(input); - boolean complete = finished && !input.hasRemaining(); + decodeds.add(decoded); + boolean decodeComplete = !input.hasRemaining() && !decoded.hasRemaining(); + boolean complete = finished && decodeComplete; if (logger.isDebugEnabled()) logger.debug("Ungzipped {} bytes, complete={}", decoded.remaining(), complete); if (decoded.hasRemaining() || complete) transformer.transform(decoded, complete, buffers); + if (decodeComplete) + break; } } @@ -811,6 +824,8 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet buffers.clear(); output.add(result); } + + decodeds.forEach(decoder::release); } private ByteBuffer gzip(List buffers, boolean finished) throws IOException diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java index f273fc262f1..2efd189a993 100644 --- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java @@ -18,13 +18,6 @@ package org.eclipse.jetty.proxy; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -62,7 +55,6 @@ import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.FutureResponseListener; @@ -87,10 +79,16 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.OS; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class AsyncMiddleManServletTest { private static final Logger LOG = Log.getLogger(AsyncMiddleManServletTest.class); @@ -120,7 +118,7 @@ public class AsyncMiddleManServletTest private void startProxy(AsyncMiddleManServlet proxyServlet) throws Exception { - startProxy(proxyServlet, new HashMap()); + startProxy(proxyServlet, new HashMap<>()); } private void startProxy(AsyncMiddleManServlet proxyServlet, Map initParams) throws Exception @@ -144,8 +142,8 @@ public class AsyncMiddleManServletTest proxyContext.addServlet(proxyServletHolder, "/*"); proxy.start(); - - stackless=new StacklessLogging(proxyServlet._log); + + stackless = new StacklessLogging(proxyServlet._log); } private void startClient() throws Exception @@ -219,7 +217,7 @@ public class AsyncMiddleManServletTest @Override protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest) { - return new GZIPContentTransformer(ContentTransformer.IDENTITY); + return new GZIPContentTransformer(getHttpClient(), ContentTransformer.IDENTITY); } }); startClient(); @@ -247,7 +245,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); response.getOutputStream().write(gzipBytes); @@ -318,7 +316,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); @@ -412,7 +410,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); @@ -454,7 +452,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { // decode input stream thru gzip ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -500,13 +498,9 @@ public class AsyncMiddleManServletTest @Override protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest) { - return new ContentTransformer() + return (input, finished, output) -> { - @Override - public void transform(ByteBuffer input, boolean finished, List output) throws IOException - { - throw new NullPointerException("explicitly_thrown_by_test"); - } + throw new NullPointerException("explicitly_thrown_by_test"); }; } }); @@ -537,7 +531,7 @@ public class AsyncMiddleManServletTest private int count; @Override - public void transform(ByteBuffer input, boolean finished, List output) throws IOException + public void transform(ByteBuffer input, boolean finished, List output) { if (++count < 2) output.add(input); @@ -552,16 +546,12 @@ public class AsyncMiddleManServletTest final CountDownLatch latch = new CountDownLatch(1); DeferredContentProvider content = new DeferredContentProvider(); client.newRequest("localhost", serverConnector.getLocalPort()) - .content(content) - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) - { - if (result.isSucceeded() && result.getResponse().getStatus() == 502) - latch.countDown(); - } - }); + .content(content) + .send(result -> + { + if (result.isSucceeded() && result.getResponse().getStatus() == 502) + latch.countDown(); + }); content.offer(ByteBuffer.allocate(512)); sleep(1000); @@ -578,7 +568,7 @@ public class AsyncMiddleManServletTest testDownstreamTransformationThrows(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { // To trigger the test failure we need that onContent() // is called twice, so the second time the test throws. @@ -597,7 +587,7 @@ public class AsyncMiddleManServletTest testDownstreamTransformationThrows(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { // To trigger the test failure we need that onContent() // is called only once, so the the test throws from onSuccess(). @@ -621,7 +611,7 @@ public class AsyncMiddleManServletTest private int count; @Override - public void transform(ByteBuffer input, boolean finished, List output) throws IOException + public void transform(ByteBuffer input, boolean finished, List output) { if (++count < 2) output.add(input); @@ -660,7 +650,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { OutputStream output = response.getOutputStream(); if (gzipped) @@ -694,25 +684,17 @@ public class AsyncMiddleManServletTest final CountDownLatch latch = new CountDownLatch(1); client.newRequest("localhost", serverConnector.getLocalPort()) - .onResponseContent(new Response.ContentListener() + .onResponseContent((response, content) -> { - @Override - public void onContent(Response response, ByteBuffer content) - { - // Slow down the reader so that the - // write from the proxy gets congested. - sleep(1); - } + // Slow down the reader so that the + // write from the proxy gets congested. + sleep(1); }) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - assertTrue(result.isSucceeded()); - assertEquals(200, result.getResponse().getStatus()); - latch.countDown(); - } + assertTrue(result.isSucceeded()); + assertEquals(200, result.getResponse().getStatus()); + latch.countDown(); }); assertTrue(latch.await(15, TimeUnit.SECONDS)); @@ -724,7 +706,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { byte[] chunk = new byte[1024]; int contentLength = 2 * chunk.length; @@ -741,14 +723,10 @@ public class AsyncMiddleManServletTest @Override protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse) { - return new ContentTransformer() + return (input, finished, output) -> { - @Override - public void transform(ByteBuffer input, boolean finished, List output) throws IOException - { - if (!finished) - output.add(input); - } + if (!finished) + output.add(input); }; } }); @@ -779,15 +757,11 @@ public class AsyncMiddleManServletTest DeferredContentProvider content = new DeferredContentProvider(); client.newRequest("localhost", serverConnector.getLocalPort()) .content(content) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - System.err.println(result); - if (result.getResponse().getStatus() == 500) - latch.countDown(); - } + System.err.println(result); + if (result.getResponse().getStatus() == 500) + latch.countDown(); }); content.offer(ByteBuffer.allocate(512)); sleep(1000); @@ -821,16 +795,12 @@ public class AsyncMiddleManServletTest final CountDownLatch latch = new CountDownLatch(1); DeferredContentProvider content = new DeferredContentProvider(); client.newRequest("localhost", serverConnector.getLocalPort()) - .content(content) - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) - { - if (result.getResponse().getStatus() == 502) - latch.countDown(); - } - }); + .content(content) + .send(result -> + { + if (result.getResponse().getStatus() == 502) + latch.countDown(); + }); content.offer(ByteBuffer.allocate(512)); sleep(1000); content.offer(ByteBuffer.allocate(512)); @@ -857,7 +827,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { ServletOutputStream output = response.getOutputStream(); output.write(new byte[512]); @@ -898,7 +868,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8)); } @@ -916,7 +886,7 @@ public class AsyncMiddleManServletTest { InputStream input = source.getInputStream(); @SuppressWarnings("unchecked") - Map obj = (Map)JSON.parse(new InputStreamReader(input, "UTF-8")); + Map obj = (Map)JSON.parse(new InputStreamReader(input, StandardCharsets.UTF_8)); // Transform the object. obj.put(key2, obj.remove(key1)); try (OutputStream output = sink.getOutputStream()) @@ -961,7 +931,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { // Write the content in two chunks. int chunk = data.length / 2; @@ -1021,7 +991,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8)); } @@ -1041,7 +1011,7 @@ public class AsyncMiddleManServletTest { InputStream input = source.getInputStream(); @SuppressWarnings("unchecked") - Map obj = (Map)JSON.parse(new InputStreamReader(input, "UTF-8")); + Map obj = (Map)JSON.parse(new InputStreamReader(input, StandardCharsets.UTF_8)); // Transform the object. obj.put(key2, obj.remove(key1)); try (OutputStream output = sink.getOutputStream()) @@ -1080,7 +1050,7 @@ public class AsyncMiddleManServletTest } // File deletion is delayed on windows, testing for deletion is not going to work - if(!OS.WINDOWS.isCurrentOs()) + if (!OS.WINDOWS.isCurrentOs()) { try (DirectoryStream paths = Files.newDirectoryStream(targetTestsDir, outputPrefix + "*.*")) { @@ -1097,7 +1067,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { IO.copy(request.getInputStream(), IO.getNullStream()); } @@ -1163,7 +1133,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString()); response.setContentLength(2); @@ -1237,7 +1207,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8)); } @@ -1260,7 +1230,7 @@ public class AsyncMiddleManServletTest if (readSource) { InputStream input = source.getInputStream(); - JSON.parse(new InputStreamReader(input, "UTF-8")); + JSON.parse(new InputStreamReader(input, StandardCharsets.UTF_8)); } // No transformation. return false; @@ -1289,7 +1259,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) { response.setStatus(HttpStatus.UNAUTHORIZED_401); response.setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), "Basic realm=\"test\""); @@ -1304,7 +1274,7 @@ public class AsyncMiddleManServletTest return new AfterContentTransformer() { @Override - public boolean transform(Source source, Sink sink) throws IOException + public boolean transform(Source source, Sink sink) { transformed.set(true); return false; @@ -1433,7 +1403,7 @@ public class AsyncMiddleManServletTest private ByteBuffer buffer; @Override - public void transform(ByteBuffer input, boolean finished, List output) throws IOException + public void transform(ByteBuffer input, boolean finished, List output) { // Buffer only the first chunk. if (buffer == null) @@ -1495,7 +1465,7 @@ public class AsyncMiddleManServletTest startServer(new HttpServlet() { @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + protected void doGet(HttpServletRequest req, HttpServletResponse resp) { if (req.getHeader("Via") != null) resp.addHeader(PROXIED_HEADER, "true"); @@ -1503,9 +1473,11 @@ public class AsyncMiddleManServletTest } }); final String proxyTo = "http://localhost:" + serverConnector.getLocalPort(); - AsyncMiddleManServlet proxyServlet = new AsyncMiddleManServlet.Transparent() { + AsyncMiddleManServlet proxyServlet = new AsyncMiddleManServlet.Transparent() + { @Override - protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse) { + protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse) + { return ContentTransformer.IDENTITY; } }; @@ -1689,7 +1661,7 @@ public class AsyncMiddleManServletTest private final List buffers = new ArrayList<>(); @Override - public void transform(ByteBuffer input, boolean finished, List output) throws IOException + public void transform(ByteBuffer input, boolean finished, List output) { if (input.hasRemaining()) { @@ -1715,7 +1687,7 @@ public class AsyncMiddleManServletTest private StringBuilder head = new StringBuilder(); @Override - public void transform(ByteBuffer input, boolean finished, List output) throws IOException + public void transform(ByteBuffer input, boolean finished, List output) { if (input.hasRemaining() && head != null) { @@ -1723,12 +1695,12 @@ public class AsyncMiddleManServletTest if (lnPos == -1) { // no linefeed found, copy it all - copyHeadBytes(input,input.limit()); + copyHeadBytes(input, input.limit()); } else { // found linefeed - copyHeadBytes(input,lnPos); + copyHeadBytes(input, lnPos); output.addAll(getHeadBytes()); // mark head as sent head = null; @@ -1764,7 +1736,7 @@ public class AsyncMiddleManServletTest private List getHeadBytes() { - ByteBuffer buf = BufferUtil.toBuffer(head.toString(),StandardCharsets.UTF_8); + ByteBuffer buf = BufferUtil.toBuffer(head.toString(), StandardCharsets.UTF_8); return Collections.singletonList(buf); } } @@ -1772,7 +1744,7 @@ public class AsyncMiddleManServletTest private static class DiscardContentTransformer implements AsyncMiddleManServlet.ContentTransformer { @Override - public void transform(ByteBuffer input, boolean finished, List output) throws IOException + public void transform(ByteBuffer input, boolean finished, List output) { } }