Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2019-02-28 16:58:23 +01:00
commit 39d122a9fb
9 changed files with 371 additions and 569 deletions

View File

@ -35,6 +35,15 @@ public interface ContentDecoder
*/
public abstract ByteBuffer decode(ByteBuffer buffer);
/**
* <p>Releases the ByteBuffer returned by {@link #decode(ByteBuffer)}.</p>
*
* @param decoded the ByteBuffer returned by {@link #decode(ByteBuffer)}
*/
public default void release(ByteBuffer decoded)
{
}
/**
* Factory for {@link ContentDecoder}s; subclasses must implement {@link #newContentDecoder()}.
* <p>

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
/**
@ -25,7 +27,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
*/
public class GZIPContentDecoder extends org.eclipse.jetty.http.GZIPContentDecoder implements ContentDecoder
{
private static final int DEFAULT_BUFFER_SIZE = 2048;
public static final int DEFAULT_BUFFER_SIZE = 8192;
public GZIPContentDecoder()
{
@ -42,6 +44,13 @@ public class GZIPContentDecoder extends org.eclipse.jetty.http.GZIPContentDecode
super(byteBufferPool, bufferSize);
}
@Override
protected boolean decodedChunk(ByteBuffer chunk)
{
super.decodedChunk(chunk);
return true;
}
/**
* Specialized {@link ContentDecoder.Factory} for the "gzip" encoding.
*/

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@ -37,7 +36,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -339,35 +338,7 @@ public abstract class HttpReceiver
}
else
{
try
{
List<ByteBuffer> decodeds = new ArrayList<>(2);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(buffer);
if (!decoded.hasRemaining())
continue;
decodeds.add(decoded);
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
}
if (decodeds.isEmpty())
{
callback.succeeded();
}
else
{
int size = decodeds.size();
CountingCallback counter = new CountingCallback(callback, size);
for (ByteBuffer decoded : decodeds)
notifier.notifyContent(response, decoded, counter, contentListeners);
}
}
catch (Throwable x)
{
callback.failed(x);
}
new Decoder(notifier, response, decoder, buffer, callback).iterate();
}
if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
@ -615,4 +586,47 @@ public abstract class HttpReceiver
*/
FAILURE
}
private class Decoder extends IteratingNestedCallback
{
private final ResponseNotifier notifier;
private final HttpResponse response;
private final ContentDecoder decoder;
private final ByteBuffer buffer;
private ByteBuffer decoded;
public Decoder(ResponseNotifier notifier, HttpResponse response, ContentDecoder decoder, ByteBuffer buffer, Callback callback)
{
super(callback);
this.notifier = notifier;
this.response = response;
this.decoder = decoder;
this.buffer = buffer;
}
@Override
protected Action process() throws Throwable
{
while (true)
{
decoded = decoder.decode(buffer);
if (decoded.hasRemaining())
break;
if (!buffer.hasRemaining())
return Action.SUCCEEDED;
}
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
notifier.notifyContent(response, decoded, this, contentListeners);
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
decoder.release(decoded);
super.succeeded();
}
}
}

View File

@ -1,288 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.junit.jupiter.api.Test;
@Deprecated
public class GZIPContentDecoderTest
{
@Test
public void testStreamNoBlocks() throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.close();
byte[] bytes = baos.toByteArray();
GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1);
int read = input.read();
assertEquals(-1, read);
}
@Test
public void testStreamBigBlockOneByteAtATime() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
baos = new ByteArrayOutputStream();
GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1);
int read;
while ((read = input.read()) >= 0)
baos.write(read);
assertEquals(data, new String(baos.toByteArray(), StandardCharsets.UTF_8));
}
@Test
public void testNoBlocks() throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.close();
byte[] bytes = baos.toByteArray();
GZIPContentDecoder decoder = new GZIPContentDecoder();
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
assertEquals(0, decoded.remaining());
}
@Test
public void testSmallBlock() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
GZIPContentDecoder decoder = new GZIPContentDecoder();
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
}
@Test
public void testSmallBlockWithGZIPChunkedAtBegin() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The header is 10 bytes, chunk at 11 bytes
byte[] bytes1 = new byte[11];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder();
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(0, decoded.capacity());
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
}
@Test
public void testSmallBlockWithGZIPChunkedAtEnd() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The trailer is 8 bytes, chunk the last 9 bytes
byte[] bytes1 = new byte[bytes.length - 9];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder();
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
assertFalse(decoder.isFinished());
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(0, decoded.remaining());
assertTrue(decoder.isFinished());
}
@Test
public void testSmallBlockWithGZIPTrailerChunked() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The trailer is 4+4 bytes, chunk the last 3 bytes
byte[] bytes1 = new byte[bytes.length - 3];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder();
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(0, decoded.capacity());
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
}
@Test
public void testTwoSmallBlocks() throws Exception
{
String data1 = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data1.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes1 = baos.toByteArray();
String data2 = "1";
baos = new ByteArrayOutputStream();
output = new GZIPOutputStream(baos);
output.write(data2.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes2 = baos.toByteArray();
byte[] bytes = new byte[bytes1.length + bytes2.length];
System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder();
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ByteBuffer decoded = decoder.decode(buffer);
assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString());
assertTrue(decoder.isFinished());
assertTrue(buffer.hasRemaining());
decoded = decoder.decode(buffer);
assertEquals(data2, StandardCharsets.UTF_8.decode(decoded).toString());
assertTrue(decoder.isFinished());
assertFalse(buffer.hasRemaining());
}
@Test
public void testBigBlock() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder();
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(buffer);
result += StandardCharsets.UTF_8.decode(decoded).toString();
}
assertEquals(data, result);
}
@Test
public void testBigBlockOneByteAtATime() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(64);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(new byte[]{buffer.get()}));
if (decoded.hasRemaining())
result += StandardCharsets.UTF_8.decode(decoded).toString();
}
assertEquals(data, result);
assertTrue(decoder.isFinished());
}
@Test
public void testBigBlockWithExtraBytes() throws Exception
{
String data1 = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data1 += data1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data1.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes1 = baos.toByteArray();
String data2 = "HELLO";
byte[] bytes2 = data2.getBytes(StandardCharsets.UTF_8);
byte[] bytes = new byte[bytes1.length + bytes2.length];
System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(64);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(buffer);
if (decoded.hasRemaining())
result += StandardCharsets.UTF_8.decode(decoded).toString();
if (decoder.isFinished())
break;
}
assertEquals(data1, result);
assertTrue(buffer.hasRemaining());
assertEquals(data2, StandardCharsets.UTF_8.decode(buffer).toString());
}
}

View File

@ -18,29 +18,36 @@
package org.eclipse.jetty.client;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class HttpClientGZIPTest extends AbstractHttpClientServerTest
{
@ParameterizedTest
@ -48,12 +55,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
public void testGZIPContentEncoding(Scenario scenario) throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(scenario, new AbstractHandler()
start(scenario, new EmptyServerHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
GZIPOutputStream gzipOutput = new GZIPOutputStream(response.getOutputStream());
gzipOutput.write(data);
@ -75,12 +81,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
public void testGZIPContentOneByteAtATime(Scenario scenario) throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(scenario, new AbstractHandler()
start(scenario, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
ByteArrayOutputStream gzipData = new ByteArrayOutputStream();
@ -112,12 +117,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
public void testGZIPContentSentTwiceInOneWrite(Scenario scenario) throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(scenario, new AbstractHandler()
start(scenario, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
ByteArrayOutputStream gzipData = new ByteArrayOutputStream();
@ -164,12 +168,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
private void testGZIPContentFragmented(Scenario scenario, final int fragment) throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(scenario, new AbstractHandler()
start(scenario, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
ByteArrayOutputStream gzipData = new ByteArrayOutputStream();
@ -204,12 +207,11 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
@ArgumentsSource(ScenarioProvider.class)
public void testGZIPContentCorrupted(Scenario scenario) throws Exception
{
start(scenario, new AbstractHandler()
start(scenario, new EmptyServerHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
// Not gzipped, will cause the client to blow up.
response.getOutputStream().print("0123456789");
@ -228,6 +230,46 @@ public class HttpClientGZIPTest extends AbstractHttpClientServerTest
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testLargeGZIPContentDoesNotPolluteByteBufferPool(Scenario scenario) throws Exception
{
String digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
Random random = new Random();
byte[] content = new byte[1024 * 1024];
for (int i = 0; i < content.length; ++i)
content[i] = (byte)digits.charAt(random.nextInt(digits.length()));
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setContentType("text/plain;charset=" + StandardCharsets.US_ASCII.name());
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream());
gzip.write(content);
gzip.finish();
}
});
ByteBufferPool pool = client.getByteBufferPool();
assumeTrue(pool instanceof MappedByteBufferPool);
MappedByteBufferPool bufferPool = (MappedByteBufferPool)pool;
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
assertArrayEquals(content, response.getContent());
long directMemory = bufferPool.getMemory(true);
assertThat(directMemory, lessThan((long)content.length));
long heapMemory = bufferPool.getMemory(false);
assertThat(heapMemory, lessThan((long)content.length));
}
private static void sleep(long ms) throws IOException
{
try

View File

@ -18,7 +18,10 @@
package org.eclipse.jetty.http;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import java.util.zip.ZipException;
@ -28,13 +31,13 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.component.Destroyable;
/**
* Decoder for the "gzip" encoding.
* <p>
* A decoder that inflates gzip compressed data that has been
* optimized for async usage with minimal data copies.
* <p>Decoder for the "gzip" content encoding.</p>
* <p>This decoder inflates gzip compressed data, and has
* been optimized for async usage with minimal data copies.</p>
*/
public class GZIPContentDecoder implements Destroyable
{
private final List<ByteBuffer> _inflateds = new ArrayList<>();
private final Inflater _inflater = new Inflater(true);
private final ByteBufferPool _pool;
private final int _bufferSize;
@ -46,14 +49,14 @@ public class GZIPContentDecoder implements Destroyable
public GZIPContentDecoder()
{
this(null,2048);
this(null, 2048);
}
public GZIPContentDecoder(int bufferSize)
{
this(null,bufferSize);
this(null, bufferSize);
}
public GZIPContentDecoder(ByteBufferPool pool, int bufferSize)
{
_bufferSize = bufferSize;
@ -61,68 +64,95 @@ public class GZIPContentDecoder implements Destroyable
reset();
}
/** Inflate compressed data from a buffer.
*
* @param compressed Buffer containing compressed data.
* @return Buffer containing inflated data.
/**
* <p>Inflates compressed data from a buffer.</p>
* <p>The buffers returned by this method should be released
* via {@link #release(ByteBuffer)}.</p>
* <p>This method may fully consume the input buffer, but return
* only a chunk of the inflated bytes, to allow applications to
* consume the inflated chunk before performing further inflation,
* applying backpressure. In this case, this method should be
* invoked again with the same input buffer (even if
* it's already fully consumed) and that will produce another
* chunk of inflated bytes. Termination happens when the input
* buffer is fully consumed, and the returned buffer is empty.</p>
* <p>See {@link #decodedChunk(ByteBuffer)} to perform inflating
* in a non-blocking way that allows to apply backpressure.</p>
*
* @param compressed the buffer containing compressed data.
* @return a buffer containing inflated data.
*/
public ByteBuffer decode(ByteBuffer compressed)
{
decodeChunks(compressed);
if (BufferUtil.isEmpty(_inflated) || _state==State.CRC || _state==State.ISIZE )
return BufferUtil.EMPTY_BUFFER;
ByteBuffer result = _inflated;
_inflated = null;
return result;
}
/** Called when a chunk of data is inflated.
* <p>The default implementation aggregates all the chunks
* into a single buffer returned from {@link #decode(ByteBuffer)}.
* Derived implementations may choose to consume chunks individually
* and return false to prevent further inflation until a subsequent
* call to {@link #decode(ByteBuffer)} or {@link #decodeChunks(ByteBuffer)}.
*
* @param chunk The inflated chunk of data
* @return False if inflating should continue, or True if the call
* to {@link #decodeChunks(ByteBuffer)} or {@link #decode(ByteBuffer)}
* should return, allowing back pressure of compressed data.
*/
protected boolean decodedChunk(ByteBuffer chunk)
{
if (_inflated==null)
if (_inflateds.isEmpty())
{
_inflated=chunk;
if (BufferUtil.isEmpty(_inflated) || _state == State.CRC || _state == State.ISIZE)
return BufferUtil.EMPTY_BUFFER;
ByteBuffer result = _inflated;
_inflated = null;
return result;
}
else
{
int size = _inflated.remaining() + chunk.remaining();
if (size<=_inflated.capacity())
_inflateds.add(_inflated);
_inflated = null;
int length = _inflateds.stream().mapToInt(Buffer::remaining).sum();
ByteBuffer result = acquire(length);
for (ByteBuffer buffer : _inflateds)
{
BufferUtil.append(_inflated,chunk);
BufferUtil.append(result, buffer);
release(buffer);
}
_inflateds.clear();
return result;
}
}
/**
* <p>Called when a chunk of data is inflated.</p>
* <p>The default implementation aggregates all the chunks
* into a single buffer returned from {@link #decode(ByteBuffer)}.</p>
* <p>Derived implementations may choose to consume inflated chunks
* individually and return {@code true} from this method to prevent
* further inflation until a subsequent call to {@link #decode(ByteBuffer)}
* or {@link #decodeChunks(ByteBuffer)} is made.
*
* @param chunk the inflated chunk of data
* @return false if inflating should continue, or true if the call
* to {@link #decodeChunks(ByteBuffer)} or {@link #decode(ByteBuffer)}
* should return, allowing to consume the inflated chunk and apply
* backpressure
*/
protected boolean decodedChunk(ByteBuffer chunk)
{
if (_inflated == null)
{
_inflated = chunk;
}
else
{
if (BufferUtil.space(_inflated) >= chunk.remaining())
{
BufferUtil.append(_inflated, chunk);
release(chunk);
}
else
{
ByteBuffer bigger=acquire(size);
int pos=BufferUtil.flipToFill(bigger);
BufferUtil.put(_inflated,bigger);
BufferUtil.put(chunk,bigger);
BufferUtil.flipToFlush(bigger,pos);
release(_inflated);
release(chunk);
_inflated = bigger;
_inflateds.add(_inflated);
_inflated = chunk;
}
}
return false;
}
/**
* Inflate compressed data.
* <p>Inflation continues until the compressed block end is reached, there is no
* more compressed data or a call to {@link #decodedChunk(ByteBuffer)} returns true.
* @param compressed Buffer of compressed data to inflate
* <p>Inflates compressed data.</p>
* <p>Inflation continues until the compressed block end is reached, there is no
* more compressed data or a call to {@link #decodedChunk(ByteBuffer)} returns true.</p>
*
* @param compressed the buffer of compressed data to inflate
*/
protected void decodeChunks(ByteBuffer compressed)
{
@ -164,24 +194,24 @@ public class GZIPContentDecoder implements Destroyable
}
break;
}
case DATA:
{
while (true)
{
if (buffer==null)
if (buffer == null)
buffer = acquire(_bufferSize);
try
{
int length = _inflater.inflate(buffer.array(),buffer.arrayOffset(),buffer.capacity());
int length = _inflater.inflate(buffer.array(), buffer.arrayOffset(), buffer.capacity());
buffer.limit(length);
}
catch (DataFormatException x)
{
throw new ZipException(x.getMessage());
}
if (buffer.hasRemaining())
{
ByteBuffer chunk = buffer;
@ -195,7 +225,7 @@ public class GZIPContentDecoder implements Destroyable
return;
if (compressed.hasArray())
{
_inflater.setInput(compressed.array(),compressed.arrayOffset()+compressed.position(),compressed.remaining());
_inflater.setInput(compressed.array(), compressed.arrayOffset() + compressed.position(), compressed.remaining());
compressed.position(compressed.limit());
}
else
@ -204,7 +234,7 @@ public class GZIPContentDecoder implements Destroyable
byte[] input = new byte[compressed.remaining()];
compressed.get(input);
_inflater.setInput(input);
}
}
}
else if (_inflater.finished())
{
@ -218,14 +248,14 @@ public class GZIPContentDecoder implements Destroyable
}
continue;
}
default:
break;
}
if (!compressed.hasRemaining())
break;
byte currByte = compressed.get();
switch (_state)
{
@ -354,7 +384,7 @@ public class GZIPContentDecoder implements Destroyable
// TODO ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output);
reset();
return ;
return;
}
break;
}
@ -369,7 +399,7 @@ public class GZIPContentDecoder implements Destroyable
}
finally
{
if (buffer!=null)
if (buffer != null)
release(buffer);
}
}
@ -398,28 +428,28 @@ public class GZIPContentDecoder implements Destroyable
{
INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE
}
/**
* @param capacity capacity capacity of the allocated ByteBuffer
* @return An indirect buffer of the configured buffersize either from the pool or freshly allocated.
* @param capacity capacity of the ByteBuffer to acquire
* @return a heap buffer of the configured capacity either from the pool or freshly allocated.
*/
public ByteBuffer acquire(int capacity)
{
return _pool==null?BufferUtil.allocate(capacity):_pool.acquire(capacity,false);
return _pool == null ? BufferUtil.allocate(capacity) : _pool.acquire(capacity, false);
}
/**
* Release an allocated buffer.
* <p>This method will called {@link ByteBufferPool#release(ByteBuffer)} if a buffer pool has
* been configured. This method should be called once for all buffers returned from {@link #decode(ByteBuffer)}
* or passed to {@link #decodedChunk(ByteBuffer)}.
* @param buffer The buffer to release.
* <p>Releases an allocated buffer.</p>
* <p>This method calls {@link ByteBufferPool#release(ByteBuffer)} if a buffer pool has
* been configured.</p>
* <p>This method should be called once for all buffers returned from {@link #decode(ByteBuffer)}
* or passed to {@link #decodedChunk(ByteBuffer)}.</p>
*
* @param buffer the buffer to release.
*/
public void release(ByteBuffer buffer)
{
@SuppressWarnings("ReferenceEquality")
boolean isTheEmptyBuffer = (buffer==BufferUtil.EMPTY_BUFFER);
if (_pool!=null && !isTheEmptyBuffer)
if (_pool != null && !BufferUtil.isTheEmptyBuffer(buffer))
_pool.release(buffer);
}
}

View File

@ -18,10 +18,6 @@
package org.eclipse.jetty.http;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
@ -35,52 +31,55 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GZIPContentDecoderTest
{
ArrayByteBufferPool pool;
AtomicInteger buffers = new AtomicInteger(0);
private ArrayByteBufferPool pool;
private AtomicInteger buffers = new AtomicInteger(0);
@BeforeEach
public void beforeClass() throws Exception
public void before()
{
buffers.set(0);
pool = new ArrayByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
buffers.incrementAndGet();
return super.acquire(size, direct);
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
buffers.incrementAndGet();
return super.acquire(size,direct);
}
@Override
public void release(ByteBuffer buffer)
{
buffers.decrementAndGet();
super.release(buffer);
}
@Override
public void release(ByteBuffer buffer)
{
buffers.decrementAndGet();
super.release(buffer);
}
};
};
}
@AfterEach
public void afterClass() throws Exception
public void after()
{
assertEquals(0,buffers.get());
assertEquals(0, buffers.get());
}
@Test
public void testCompresedContentFormat() throws Exception
public void testCompressedContentFormat()
{
assertTrue(CompressedContentFormat.tagEquals("tag","tag"));
assertTrue(CompressedContentFormat.tagEquals("\"tag\"","\"tag\""));
assertTrue(CompressedContentFormat.tagEquals("\"tag\"","\"tag--gzip\""));
assertFalse(CompressedContentFormat.tagEquals("Zag","Xag--gzip"));
assertFalse(CompressedContentFormat.tagEquals("xtag","tag"));
assertTrue(CompressedContentFormat.tagEquals("tag", "tag"));
assertTrue(CompressedContentFormat.tagEquals("\"tag\"", "\"tag\""));
assertTrue(CompressedContentFormat.tagEquals("\"tag\"", "\"tag--gzip\""));
assertFalse(CompressedContentFormat.tagEquals("Zag", "Xag--gzip"));
assertFalse(CompressedContentFormat.tagEquals("xtag", "tag"));
}
@Test
public void testStreamNoBlocks() throws Exception
{
@ -122,7 +121,7 @@ public class GZIPContentDecoderTest
output.close();
byte[] bytes = baos.toByteArray();
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
assertEquals(0, decoded.remaining());
}
@ -138,7 +137,7 @@ public class GZIPContentDecoderTest
output.close();
byte[] bytes = baos.toByteArray();
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
decoder.release(decoded);
@ -161,7 +160,7 @@ public class GZIPContentDecoderTest
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(0, decoded.capacity());
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
@ -186,7 +185,7 @@ public class GZIPContentDecoderTest
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
assertFalse(decoder.isFinished());
@ -214,7 +213,7 @@ public class GZIPContentDecoderTest
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(0, decoded.capacity());
decoder.release(decoded);
@ -244,7 +243,7 @@ public class GZIPContentDecoderTest
System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ByteBuffer decoded = decoder.decode(buffer);
assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString());
@ -271,7 +270,7 @@ public class GZIPContentDecoderTest
byte[] bytes = baos.toByteArray();
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
@ -41,12 +42,14 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ContentDecoder;
import org.eclipse.jetty.client.GZIPContentDecoder;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -275,7 +278,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
}
@Override
public void onDataAvailable() throws IOException
public void onDataAvailable()
{
iterate();
}
@ -370,9 +373,8 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
if (size > 0)
{
CountingCallback counter = new CountingCallback(callback, size);
for (int i = 0; i < size; ++i)
for (ByteBuffer buffer : buffers)
{
ByteBuffer buffer = buffers.get(i);
newContentBytes += buffer.remaining();
provider.offer(buffer, counter);
}
@ -476,9 +478,8 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
if (size > 0)
{
Callback counter = size == 1 ? callback : new CountingCallback(callback, size);
for (int i = 0; i < size; ++i)
for (ByteBuffer buffer : buffers)
{
ByteBuffer buffer = buffers.get(i);
newContentBytes += buffer.remaining();
proxyWriter.offer(buffer, counter);
}
@ -540,9 +541,8 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
if (size > 0)
{
Callback callback = size == 1 ? complete : new CountingCallback(complete, size);
for (int i = 0; i < size; ++i)
for (ByteBuffer buffer : buffers)
{
ByteBuffer buffer = buffers.get(i);
newContentBytes += buffer.remaining();
proxyWriter.offer(buffer, callback);
}
@ -686,7 +686,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
/**
* <p>Allows applications to transform upstream and downstream content.</p>
* <p>Typical use cases of transformations are URL rewriting of HTML anchors
* (where the value of the <code>href</code> attribute of &lt;a&gt; elements
* (where the value of the {@code href} attribute of &lt;a&gt; elements
* is modified by the proxy), field renaming of JSON documents, etc.</p>
* <p>Applications should override {@link #newClientRequestContentTransformer(HttpServletRequest, Request)}
* and/or {@link #newServerResponseContentTransformer(HttpServletRequest, HttpServletResponse, Response)}
@ -762,16 +762,23 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
private static final Logger logger = Log.getLogger(GZIPContentTransformer.class);
private final List<ByteBuffer> buffers = new ArrayList<>(2);
private final ContentDecoder decoder = new GZIPContentDecoder();
private final ContentTransformer transformer;
private final ContentDecoder decoder;
private final ByteArrayOutputStream out;
private final GZIPOutputStream gzipOut;
public GZIPContentTransformer(ContentTransformer transformer)
{
this(null, transformer);
}
public GZIPContentTransformer(HttpClient httpClient, ContentTransformer transformer)
{
try
{
this.transformer = transformer;
ByteBufferPool byteBufferPool = httpClient == null ? null : httpClient.getByteBufferPool();
this.decoder = new GZIPContentDecoder(byteBufferPool, GZIPContentDecoder.DEFAULT_BUFFER_SIZE);
this.out = new ByteArrayOutputStream();
this.gzipOut = new GZIPOutputStream(out);
}
@ -787,6 +794,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
if (logger.isDebugEnabled())
logger.debug("Ungzipping {} bytes, finished={}", input.remaining(), finished);
List<ByteBuffer> decodeds = Collections.emptyList();
if (!input.hasRemaining())
{
if (finished)
@ -794,14 +802,19 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
}
else
{
while (input.hasRemaining())
decodeds = new ArrayList<>();
while (true)
{
ByteBuffer decoded = decoder.decode(input);
boolean complete = finished && !input.hasRemaining();
decodeds.add(decoded);
boolean decodeComplete = !input.hasRemaining() && !decoded.hasRemaining();
boolean complete = finished && decodeComplete;
if (logger.isDebugEnabled())
logger.debug("Ungzipped {} bytes, complete={}", decoded.remaining(), complete);
if (decoded.hasRemaining() || complete)
transformer.transform(decoded, complete, buffers);
if (decodeComplete)
break;
}
}
@ -811,6 +824,8 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
buffers.clear();
output.add(result);
}
decodeds.forEach(decoder::release);
}
private ByteBuffer gzip(List<ByteBuffer> buffers, boolean finished) throws IOException

View File

@ -18,13 +18,6 @@
package org.eclipse.jetty.proxy;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -62,7 +55,6 @@ import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.FutureResponseListener;
@ -87,10 +79,16 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.OS;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AsyncMiddleManServletTest
{
private static final Logger LOG = Log.getLogger(AsyncMiddleManServletTest.class);
@ -120,7 +118,7 @@ public class AsyncMiddleManServletTest
private void startProxy(AsyncMiddleManServlet proxyServlet) throws Exception
{
startProxy(proxyServlet, new HashMap<String, String>());
startProxy(proxyServlet, new HashMap<>());
}
private void startProxy(AsyncMiddleManServlet proxyServlet, Map<String, String> initParams) throws Exception
@ -144,8 +142,8 @@ public class AsyncMiddleManServletTest
proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start();
stackless=new StacklessLogging(proxyServlet._log);
stackless = new StacklessLogging(proxyServlet._log);
}
private void startClient() throws Exception
@ -219,7 +217,7 @@ public class AsyncMiddleManServletTest
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
{
return new GZIPContentTransformer(ContentTransformer.IDENTITY);
return new GZIPContentTransformer(getHttpClient(), ContentTransformer.IDENTITY);
}
});
startClient();
@ -247,7 +245,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
response.getOutputStream().write(gzipBytes);
@ -318,7 +316,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
@ -412,7 +410,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
@ -454,7 +452,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
// decode input stream thru gzip
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@ -500,13 +498,9 @@ public class AsyncMiddleManServletTest
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
{
return new ContentTransformer()
return (input, finished, output) ->
{
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
{
throw new NullPointerException("explicitly_thrown_by_test");
}
throw new NullPointerException("explicitly_thrown_by_test");
};
}
});
@ -537,7 +531,7 @@ public class AsyncMiddleManServletTest
private int count;
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
{
if (++count < 2)
output.add(input);
@ -552,16 +546,12 @@ public class AsyncMiddleManServletTest
final CountDownLatch latch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", serverConnector.getLocalPort())
.content(content)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() && result.getResponse().getStatus() == 502)
latch.countDown();
}
});
.content(content)
.send(result ->
{
if (result.isSucceeded() && result.getResponse().getStatus() == 502)
latch.countDown();
});
content.offer(ByteBuffer.allocate(512));
sleep(1000);
@ -578,7 +568,7 @@ public class AsyncMiddleManServletTest
testDownstreamTransformationThrows(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
// To trigger the test failure we need that onContent()
// is called twice, so the second time the test throws.
@ -597,7 +587,7 @@ public class AsyncMiddleManServletTest
testDownstreamTransformationThrows(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
// To trigger the test failure we need that onContent()
// is called only once, so the the test throws from onSuccess().
@ -621,7 +611,7 @@ public class AsyncMiddleManServletTest
private int count;
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
{
if (++count < 2)
output.add(input);
@ -660,7 +650,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
OutputStream output = response.getOutputStream();
if (gzipped)
@ -694,25 +684,17 @@ public class AsyncMiddleManServletTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", serverConnector.getLocalPort())
.onResponseContent(new Response.ContentListener()
.onResponseContent((response, content) ->
{
@Override
public void onContent(Response response, ByteBuffer content)
{
// Slow down the reader so that the
// write from the proxy gets congested.
sleep(1);
}
// Slow down the reader so that the
// write from the proxy gets congested.
sleep(1);
})
.send(new Response.CompleteListener()
.send(result ->
{
@Override
public void onComplete(Result result)
{
assertTrue(result.isSucceeded());
assertEquals(200, result.getResponse().getStatus());
latch.countDown();
}
assertTrue(result.isSucceeded());
assertEquals(200, result.getResponse().getStatus());
latch.countDown();
});
assertTrue(latch.await(15, TimeUnit.SECONDS));
@ -724,7 +706,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
byte[] chunk = new byte[1024];
int contentLength = 2 * chunk.length;
@ -741,14 +723,10 @@ public class AsyncMiddleManServletTest
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
return new ContentTransformer()
return (input, finished, output) ->
{
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
{
if (!finished)
output.add(input);
}
if (!finished)
output.add(input);
};
}
});
@ -779,15 +757,11 @@ public class AsyncMiddleManServletTest
DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", serverConnector.getLocalPort())
.content(content)
.send(new Response.CompleteListener()
.send(result ->
{
@Override
public void onComplete(Result result)
{
System.err.println(result);
if (result.getResponse().getStatus() == 500)
latch.countDown();
}
System.err.println(result);
if (result.getResponse().getStatus() == 500)
latch.countDown();
});
content.offer(ByteBuffer.allocate(512));
sleep(1000);
@ -821,16 +795,12 @@ public class AsyncMiddleManServletTest
final CountDownLatch latch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", serverConnector.getLocalPort())
.content(content)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.getResponse().getStatus() == 502)
latch.countDown();
}
});
.content(content)
.send(result ->
{
if (result.getResponse().getStatus() == 502)
latch.countDown();
});
content.offer(ByteBuffer.allocate(512));
sleep(1000);
content.offer(ByteBuffer.allocate(512));
@ -857,7 +827,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
ServletOutputStream output = response.getOutputStream();
output.write(new byte[512]);
@ -898,7 +868,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
}
@ -916,7 +886,7 @@ public class AsyncMiddleManServletTest
{
InputStream input = source.getInputStream();
@SuppressWarnings("unchecked")
Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, "UTF-8"));
Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, StandardCharsets.UTF_8));
// Transform the object.
obj.put(key2, obj.remove(key1));
try (OutputStream output = sink.getOutputStream())
@ -961,7 +931,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
// Write the content in two chunks.
int chunk = data.length / 2;
@ -1021,7 +991,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
}
@ -1041,7 +1011,7 @@ public class AsyncMiddleManServletTest
{
InputStream input = source.getInputStream();
@SuppressWarnings("unchecked")
Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, "UTF-8"));
Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, StandardCharsets.UTF_8));
// Transform the object.
obj.put(key2, obj.remove(key1));
try (OutputStream output = sink.getOutputStream())
@ -1080,7 +1050,7 @@ public class AsyncMiddleManServletTest
}
// File deletion is delayed on windows, testing for deletion is not going to work
if(!OS.WINDOWS.isCurrentOs())
if (!OS.WINDOWS.isCurrentOs())
{
try (DirectoryStream<Path> paths = Files.newDirectoryStream(targetTestsDir, outputPrefix + "*.*"))
{
@ -1097,7 +1067,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
IO.copy(request.getInputStream(), IO.getNullStream());
}
@ -1163,7 +1133,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
response.setContentLength(2);
@ -1237,7 +1207,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
}
@ -1260,7 +1230,7 @@ public class AsyncMiddleManServletTest
if (readSource)
{
InputStream input = source.getInputStream();
JSON.parse(new InputStreamReader(input, "UTF-8"));
JSON.parse(new InputStreamReader(input, StandardCharsets.UTF_8));
}
// No transformation.
return false;
@ -1289,7 +1259,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
protected void service(HttpServletRequest request, HttpServletResponse response)
{
response.setStatus(HttpStatus.UNAUTHORIZED_401);
response.setHeader(HttpHeader.WWW_AUTHENTICATE.asString(), "Basic realm=\"test\"");
@ -1304,7 +1274,7 @@ public class AsyncMiddleManServletTest
return new AfterContentTransformer()
{
@Override
public boolean transform(Source source, Sink sink) throws IOException
public boolean transform(Source source, Sink sink)
{
transformed.set(true);
return false;
@ -1433,7 +1403,7 @@ public class AsyncMiddleManServletTest
private ByteBuffer buffer;
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
{
// Buffer only the first chunk.
if (buffer == null)
@ -1495,7 +1465,7 @@ public class AsyncMiddleManServletTest
startServer(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
{
if (req.getHeader("Via") != null)
resp.addHeader(PROXIED_HEADER, "true");
@ -1503,9 +1473,11 @@ public class AsyncMiddleManServletTest
}
});
final String proxyTo = "http://localhost:" + serverConnector.getLocalPort();
AsyncMiddleManServlet proxyServlet = new AsyncMiddleManServlet.Transparent() {
AsyncMiddleManServlet proxyServlet = new AsyncMiddleManServlet.Transparent()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse) {
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
return ContentTransformer.IDENTITY;
}
};
@ -1689,7 +1661,7 @@ public class AsyncMiddleManServletTest
private final List<ByteBuffer> buffers = new ArrayList<>();
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
{
if (input.hasRemaining())
{
@ -1715,7 +1687,7 @@ public class AsyncMiddleManServletTest
private StringBuilder head = new StringBuilder();
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
{
if (input.hasRemaining() && head != null)
{
@ -1723,12 +1695,12 @@ public class AsyncMiddleManServletTest
if (lnPos == -1)
{
// no linefeed found, copy it all
copyHeadBytes(input,input.limit());
copyHeadBytes(input, input.limit());
}
else
{
// found linefeed
copyHeadBytes(input,lnPos);
copyHeadBytes(input, lnPos);
output.addAll(getHeadBytes());
// mark head as sent
head = null;
@ -1764,7 +1736,7 @@ public class AsyncMiddleManServletTest
private List<ByteBuffer> getHeadBytes()
{
ByteBuffer buf = BufferUtil.toBuffer(head.toString(),StandardCharsets.UTF_8);
ByteBuffer buf = BufferUtil.toBuffer(head.toString(), StandardCharsets.UTF_8);
return Collections.singletonList(buf);
}
}
@ -1772,7 +1744,7 @@ public class AsyncMiddleManServletTest
private static class DiscardContentTransformer implements AsyncMiddleManServlet.ContentTransformer
{
@Override
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
public void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output)
{
}
}