diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java b/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java
index 8bae3b8a993..c469d2e4719 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java
@@ -18,26 +18,13 @@
package org.eclipse.jetty.client;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.zip.DataFormatException;
-import java.util.zip.Inflater;
-import java.util.zip.ZipException;
-
-import org.eclipse.jetty.util.BufferUtil;
/**
* {@link ContentDecoder} for the "gzip" encoding.
+ *
*/
-public class GZIPContentDecoder implements ContentDecoder
+public class GZIPContentDecoder extends org.eclipse.jetty.http.GZIPContentDecoder implements ContentDecoder
{
- private final Inflater inflater = new Inflater(true);
- private final byte[] bytes;
- private byte[] output;
- private State state;
- private int size;
- private int value;
- private byte flags;
public GZIPContentDecoder()
{
@@ -46,285 +33,7 @@ public class GZIPContentDecoder implements ContentDecoder
public GZIPContentDecoder(int bufferSize)
{
- this.bytes = new byte[bufferSize];
- reset();
- }
-
- /**
- * {@inheritDoc}
- *
If the decoding did not produce any output, for example because it consumed gzip header
- * or trailer bytes, it returns a buffer with zero capacity.
- * This method never returns null.
- * The given {@code buffer}'s position will be modified to reflect the bytes consumed during
- * the decoding.
- * The decoding may be finished without consuming the buffer completely if the buffer contains
- * gzip bytes plus other bytes (either plain or gzipped).
- */
- @Override
- public ByteBuffer decode(ByteBuffer buffer)
- {
- try
- {
- while (buffer.hasRemaining())
- {
- byte currByte = buffer.get();
- switch (state)
- {
- case INITIAL:
- {
- buffer.position(buffer.position() - 1);
- state = State.ID;
- break;
- }
- case ID:
- {
- value += (currByte & 0xFF) << 8 * size;
- ++size;
- if (size == 2)
- {
- if (value != 0x8B1F)
- throw new ZipException("Invalid gzip bytes");
- state = State.CM;
- }
- break;
- }
- case CM:
- {
- if ((currByte & 0xFF) != 0x08)
- throw new ZipException("Invalid gzip compression method");
- state = State.FLG;
- break;
- }
- case FLG:
- {
- flags = currByte;
- state = State.MTIME;
- size = 0;
- value = 0;
- break;
- }
- case MTIME:
- {
- // Skip the 4 MTIME bytes
- ++size;
- if (size == 4)
- state = State.XFL;
- break;
- }
- case XFL:
- {
- // Skip XFL
- state = State.OS;
- break;
- }
- case OS:
- {
- // Skip OS
- state = State.FLAGS;
- break;
- }
- case FLAGS:
- {
- buffer.position(buffer.position() - 1);
- if ((flags & 0x04) == 0x04)
- {
- state = State.EXTRA_LENGTH;
- size = 0;
- value = 0;
- }
- else if ((flags & 0x08) == 0x08)
- state = State.NAME;
- else if ((flags & 0x10) == 0x10)
- state = State.COMMENT;
- else if ((flags & 0x2) == 0x2)
- {
- state = State.HCRC;
- size = 0;
- value = 0;
- }
- else
- state = State.DATA;
- break;
- }
- case EXTRA_LENGTH:
- {
- value += (currByte & 0xFF) << 8 * size;
- ++size;
- if (size == 2)
- state = State.EXTRA;
- break;
- }
- case EXTRA:
- {
- // Skip EXTRA bytes
- --value;
- if (value == 0)
- {
- // Clear the EXTRA flag and loop on the flags
- flags &= ~0x04;
- state = State.FLAGS;
- }
- break;
- }
- case NAME:
- {
- // Skip NAME bytes
- if (currByte == 0)
- {
- // Clear the NAME flag and loop on the flags
- flags &= ~0x08;
- state = State.FLAGS;
- }
- break;
- }
- case COMMENT:
- {
- // Skip COMMENT bytes
- if (currByte == 0)
- {
- // Clear the COMMENT flag and loop on the flags
- flags &= ~0x10;
- state = State.FLAGS;
- }
- break;
- }
- case HCRC:
- {
- // Skip HCRC
- ++size;
- if (size == 2)
- {
- // Clear the HCRC flag and loop on the flags
- flags &= ~0x02;
- state = State.FLAGS;
- }
- break;
- }
- case DATA:
- {
- buffer.position(buffer.position() - 1);
- while (true)
- {
- int decoded = inflate(bytes);
- if (decoded == 0)
- {
- if (inflater.needsInput())
- {
- if (buffer.hasRemaining())
- {
- byte[] input = new byte[buffer.remaining()];
- buffer.get(input);
- inflater.setInput(input);
- }
- else
- {
- if (output != null)
- {
- ByteBuffer result = ByteBuffer.wrap(output);
- output = null;
- return result;
- }
- break;
- }
- }
- else if (inflater.finished())
- {
- int remaining = inflater.getRemaining();
- buffer.position(buffer.limit() - remaining);
- state = State.CRC;
- size = 0;
- value = 0;
- break;
- }
- else
- {
- throw new ZipException("Invalid inflater state");
- }
- }
- else
- {
- if (output == null)
- {
- // Save the inflated bytes and loop to see if we have finished
- output = Arrays.copyOf(bytes, decoded);
- }
- else
- {
- // Accumulate inflated bytes and loop to see if we have finished
- byte[] newOutput = Arrays.copyOf(output, output.length + decoded);
- System.arraycopy(bytes, 0, newOutput, output.length, decoded);
- output = newOutput;
- }
- }
- }
- break;
- }
- case CRC:
- {
- value += (currByte & 0xFF) << 8 * size;
- ++size;
- if (size == 4)
- {
- // From RFC 1952, compliant decoders need not to verify the CRC
- state = State.ISIZE;
- size = 0;
- value = 0;
- }
- break;
- }
- case ISIZE:
- {
- value += (currByte & 0xFF) << 8 * size;
- ++size;
- if (size == 4)
- {
- if (value != inflater.getBytesWritten())
- throw new ZipException("Invalid input size");
-
- ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output);
- reset();
- return result;
- }
- break;
- }
- default:
- throw new ZipException();
- }
- }
- return BufferUtil.EMPTY_BUFFER;
- }
- catch (ZipException x)
- {
- throw new RuntimeException(x);
- }
- }
-
- private int inflate(byte[] bytes) throws ZipException
- {
- try
- {
- return inflater.inflate(bytes);
- }
- catch (DataFormatException x)
- {
- throw new ZipException(x.getMessage());
- }
- }
-
- private void reset()
- {
- inflater.reset();
- Arrays.fill(bytes, (byte)0);
- output = null;
- state = State.INITIAL;
- size = 0;
- value = 0;
- flags = 0;
- }
-
- protected boolean isFinished()
- {
- return state == State.INITIAL;
+ super(null,bufferSize);
}
/**
@@ -351,9 +60,4 @@ public class GZIPContentDecoder implements ContentDecoder
return new GZIPContentDecoder(bufferSize);
}
}
-
- private enum State
- {
- INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE
- }
}
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java
index 9c1b460d02a..207f6d3e1fe 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java
@@ -33,6 +33,7 @@ import org.eclipse.jetty.toolchain.test.TestTracker;
import org.junit.Rule;
import org.junit.Test;
+@Deprecated
public class GZIPContentDecoderTest
{
@Rule
diff --git a/jetty-http/pom.xml b/jetty-http/pom.xml
index 8455a9d3248..9802a9f8a20 100644
--- a/jetty-http/pom.xml
+++ b/jetty-http/pom.xml
@@ -18,6 +18,11 @@
org.eclipse.jetty.toolchain
jetty-test-helper
diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java b/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java
new file mode 100644
index 00000000000..49f5aa5b746
--- /dev/null
+++ b/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java
@@ -0,0 +1,416 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.http;
+
+import java.nio.ByteBuffer;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+import java.util.zip.ZipException;
+
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.util.BufferUtil;
+
+/**
+ * Decoder for the "gzip" encoding.
+ * An Decode that inflates gzip compressed data that has been
+ * optimized for async usage with minimal data copies.
+ *
+ */
+public class GZIPContentDecoder
+{
+ private final Inflater _inflater = new Inflater(true);
+ private final ByteBufferPool _pool;
+ private final int _bufferSize;
+ private State _state;
+ private int _size;
+ private int _value;
+ private byte _flags;
+ private ByteBuffer _inflated;
+
+ public GZIPContentDecoder()
+ {
+ this(null,2048);
+ }
+
+ public GZIPContentDecoder(int bufferSize)
+ {
+ this(null,bufferSize);
+ }
+
+ public GZIPContentDecoder(ByteBufferPool pool, int bufferSize)
+ {
+ _bufferSize = bufferSize;
+ _pool = pool;
+ reset();
+ }
+
+ /** Inflate compressed data from a buffer.
+ *
+ * @param compressed Buffer containing compressed data.
+ * @return Buffer containing inflated data.
+ */
+ public ByteBuffer decode(ByteBuffer compressed)
+ {
+ decodeChunks(compressed);
+ if (BufferUtil.isEmpty(_inflated) || _state==State.CRC || _state==State.ISIZE )
+ return BufferUtil.EMPTY_BUFFER;
+
+ ByteBuffer result = _inflated;
+ _inflated = null;
+ return result;
+ }
+
+ /** Called when a chunk of data is inflated.
+ *
The default implementation aggregates all the chunks
+ * into a single buffer returned from {@link #decode(ByteBuffer)}.
+ * Derived implementations may choose to consume chunks individually
+ * and return false to prevent further inflation until a subsequent
+ * call to {@link #decode(ByteBuffer)} or {@link #decodeChunks(ByteBuffer)}.
+ *
+ * @param chunk The inflated chunk of data
+ * @return False if inflating should continue, or True if the call
+ * to {@link #decodeChunks(ByteBuffer)} or {@link #decode(ByteBuffer)}
+ * should return, allowing back pressure of compressed data.
+ */
+ protected boolean decodedChunk(ByteBuffer chunk)
+ {
+ if (_inflated==null)
+ _inflated=chunk;
+ else
+ {
+ int size = _inflated.remaining() + chunk.remaining();
+ if (size<=_inflated.capacity())
+ {
+ BufferUtil.append(_inflated,chunk);
+ BufferUtil.put(chunk,_inflated);
+ release(chunk);
+ }
+ else
+ {
+ ByteBuffer bigger=_pool==null?BufferUtil.allocate(size):_pool.acquire(size,false);
+ int pos=BufferUtil.flipToFill(bigger);
+ BufferUtil.put(_inflated,bigger);
+ BufferUtil.put(chunk,bigger);
+ BufferUtil.flipToFlush(bigger,pos);
+ release(_inflated);
+ release(chunk);
+ _inflated = bigger;
+ }
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Inflate compressed data.
+ *
Inflation continues until the compressed block end is reached, there is no
+ * more compressed data or a call to {@link #decodedChunk(ByteBuffer)} returns true.
+ * @param compressed Buffer of compressed data to inflate
+ */
+ protected void decodeChunks(ByteBuffer compressed)
+ {
+ ByteBuffer buffer = null;
+ try
+ {
+ while (true)
+ {
+ switch (_state)
+ {
+ case INITIAL:
+ {
+ _state = State.ID;
+ break;
+ }
+
+ case FLAGS:
+ {
+ if ((_flags & 0x04) == 0x04)
+ {
+ _state = State.EXTRA_LENGTH;
+ _size = 0;
+ _value = 0;
+ }
+ else if ((_flags & 0x08) == 0x08)
+ _state = State.NAME;
+ else if ((_flags & 0x10) == 0x10)
+ _state = State.COMMENT;
+ else if ((_flags & 0x2) == 0x2)
+ {
+ _state = State.HCRC;
+ _size = 0;
+ _value = 0;
+ }
+ else
+ {
+ _state = State.DATA;
+ continue;
+ }
+ break;
+ }
+
+ case DATA:
+ {
+ while (true)
+ {
+ if (buffer==null)
+ buffer = acquire();
+
+ try
+ {
+ int length = _inflater.inflate(buffer.array(),buffer.arrayOffset(),buffer.capacity());
+ buffer.limit(length);
+ }
+ catch (DataFormatException x)
+ {
+ throw new ZipException(x.getMessage());
+ }
+
+ if (buffer.hasRemaining())
+ {
+ ByteBuffer chunk = buffer;
+ buffer = null;
+ if (decodedChunk(chunk))
+ return;
+ }
+ else if (_inflater.needsInput())
+ {
+ if (!compressed.hasRemaining())
+ return;
+ if (compressed.hasArray())
+ {
+ _inflater.setInput(compressed.array(),compressed.arrayOffset()+compressed.position(),compressed.remaining());
+ compressed.position(compressed.limit());
+ }
+ else
+ {
+ // TODO use the pool
+ byte[] input = new byte[compressed.remaining()];
+ compressed.get(input);
+ _inflater.setInput(input);
+ }
+ }
+ else if (_inflater.finished())
+ {
+ int remaining = _inflater.getRemaining();
+ compressed.position(compressed.limit() - remaining);
+ _state = State.CRC;
+ _size = 0;
+ _value = 0;
+ break;
+ }
+ }
+ continue;
+ }
+
+ default:
+ break;
+ }
+
+ if (!compressed.hasRemaining())
+ break;
+
+ byte currByte = compressed.get();
+ switch (_state)
+ {
+ case ID:
+ {
+ _value += (currByte & 0xFF) << 8 * _size;
+ ++_size;
+ if (_size == 2)
+ {
+ if (_value != 0x8B1F)
+ throw new ZipException("Invalid gzip bytes");
+ _state = State.CM;
+ }
+ break;
+ }
+ case CM:
+ {
+ if ((currByte & 0xFF) != 0x08)
+ throw new ZipException("Invalid gzip compression method");
+ _state = State.FLG;
+ break;
+ }
+ case FLG:
+ {
+ _flags = currByte;
+ _state = State.MTIME;
+ _size = 0;
+ _value = 0;
+ break;
+ }
+ case MTIME:
+ {
+ // Skip the 4 MTIME bytes
+ ++_size;
+ if (_size == 4)
+ _state = State.XFL;
+ break;
+ }
+ case XFL:
+ {
+ // Skip XFL
+ _state = State.OS;
+ break;
+ }
+ case OS:
+ {
+ // Skip OS
+ _state = State.FLAGS;
+ break;
+ }
+ case EXTRA_LENGTH:
+ {
+ _value += (currByte & 0xFF) << 8 * _size;
+ ++_size;
+ if (_size == 2)
+ _state = State.EXTRA;
+ break;
+ }
+ case EXTRA:
+ {
+ // Skip EXTRA bytes
+ --_value;
+ if (_value == 0)
+ {
+ // Clear the EXTRA flag and loop on the flags
+ _flags &= ~0x04;
+ _state = State.FLAGS;
+ }
+ break;
+ }
+ case NAME:
+ {
+ // Skip NAME bytes
+ if (currByte == 0)
+ {
+ // Clear the NAME flag and loop on the flags
+ _flags &= ~0x08;
+ _state = State.FLAGS;
+ }
+ break;
+ }
+ case COMMENT:
+ {
+ // Skip COMMENT bytes
+ if (currByte == 0)
+ {
+ // Clear the COMMENT flag and loop on the flags
+ _flags &= ~0x10;
+ _state = State.FLAGS;
+ }
+ break;
+ }
+ case HCRC:
+ {
+ // Skip HCRC
+ ++_size;
+ if (_size == 2)
+ {
+ // Clear the HCRC flag and loop on the flags
+ _flags &= ~0x02;
+ _state = State.FLAGS;
+ }
+ break;
+ }
+ case CRC:
+ {
+ _value += (currByte & 0xFF) << 8 * _size;
+ ++_size;
+ if (_size == 4)
+ {
+ // From RFC 1952, compliant decoders need not to verify the CRC
+ _state = State.ISIZE;
+ _size = 0;
+ _value = 0;
+ }
+ break;
+ }
+ case ISIZE:
+ {
+ _value += (currByte & 0xFF) << 8 * _size;
+ ++_size;
+ if (_size == 4)
+ {
+ if (_value != _inflater.getBytesWritten())
+ throw new ZipException("Invalid input size");
+
+ // TODO ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output);
+ reset();
+ return ;
+ }
+ break;
+ }
+ default:
+ throw new ZipException();
+ }
+ }
+ }
+ catch (ZipException x)
+ {
+ throw new RuntimeException(x);
+ }
+ finally
+ {
+ if (buffer!=null)
+ release(buffer);
+ }
+ }
+
+ private void reset()
+ {
+ _inflater.reset();
+ _state = State.INITIAL;
+ _size = 0;
+ _value = 0;
+ _flags = 0;
+ }
+
+ public boolean isFinished()
+ {
+ return _state == State.INITIAL;
+ }
+
+ private enum State
+ {
+ INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE
+ }
+
+ /**
+ * @return An indirect buffer of the configured buffersize either from the pool or freshly allocated.
+ */
+ public ByteBuffer acquire()
+ {
+ return _pool==null?BufferUtil.allocate(_bufferSize):_pool.acquire(_bufferSize,false);
+ }
+
+ /**
+ * Release an allocated buffer.
+ *
This method will called {@link ByteBufferPool#release(ByteBuffer)} if a buffer pool has
+ * been configured. This method should be called once for all buffers returned from {@link #decode(ByteBuffer)}
+ * or passed to {@link #decodedChunk(ByteBuffer)}.
+ * @param buffer The buffer to release.
+ */
+ public void release(ByteBuffer buffer)
+ {
+ if (_pool!=null && buffer!=BufferUtil.EMPTY_BUFFER)
+ _pool.release(buffer);
+ }
+}
diff --git a/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java b/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java
new file mode 100644
index 00000000000..2c2d7195baa
--- /dev/null
+++ b/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java
@@ -0,0 +1,343 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.http;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.eclipse.jetty.io.ArrayByteBufferPool;
+import org.eclipse.jetty.toolchain.test.TestTracker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+
+public class GZIPContentDecoderTest
+{
+ @Rule
+ public final TestTracker tracker = new TestTracker();
+
+
+ ArrayByteBufferPool pool;
+ AtomicInteger buffers = new AtomicInteger(0);
+
+ @Before
+ public void beforeClass() throws Exception
+ {
+ buffers.set(0);
+ pool = new ArrayByteBufferPool()
+ {
+
+ @Override
+ public ByteBuffer acquire(int size, boolean direct)
+ {
+ buffers.incrementAndGet();
+ return super.acquire(size,direct);
+ }
+
+ @Override
+ public void release(ByteBuffer buffer)
+ {
+ buffers.decrementAndGet();
+ super.release(buffer);
+ }
+
+ };
+ }
+
+ @After
+ public void afterClass() throws Exception
+ {
+ assertEquals(0,buffers.get());
+ }
+
+
+ @Test
+ public void testStreamNoBlocks() throws Exception
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1);
+ int read = input.read();
+ assertEquals(-1, read);
+ }
+
+ @Test
+ public void testStreamBigBlockOneByteAtATime() throws Exception
+ {
+ String data = "0123456789ABCDEF";
+ for (int i = 0; i < 10; ++i)
+ data += data;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ baos = new ByteArrayOutputStream();
+ GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1);
+ int read;
+ while ((read = input.read()) >= 0)
+ baos.write(read);
+ assertEquals(data, new String(baos.toByteArray(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testNoBlocks() throws Exception
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
+ ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
+ assertEquals(0, decoded.remaining());
+ }
+
+ @Test
+ public void testSmallBlock() throws Exception
+ {
+ String data = "0";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
+ ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
+ assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
+ decoder.release(decoded);
+ }
+
+ @Test
+ public void testSmallBlockWithGZIPChunkedAtBegin() throws Exception
+ {
+ String data = "0";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ // The header is 10 bytes, chunk at 11 bytes
+ byte[] bytes1 = new byte[11];
+ System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
+ byte[] bytes2 = new byte[bytes.length - bytes1.length];
+ System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
+
+ GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
+ ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
+ assertEquals(0, decoded.capacity());
+ decoded = decoder.decode(ByteBuffer.wrap(bytes2));
+ assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
+ decoder.release(decoded);
+ }
+
+ @Test
+ public void testSmallBlockWithGZIPChunkedAtEnd() throws Exception
+ {
+ String data = "0";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ // The trailer is 8 bytes, chunk the last 9 bytes
+ byte[] bytes1 = new byte[bytes.length - 9];
+ System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
+ byte[] bytes2 = new byte[bytes.length - bytes1.length];
+ System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
+
+ GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
+ ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
+ assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
+ assertFalse(decoder.isFinished());
+ decoder.release(decoded);
+ decoded = decoder.decode(ByteBuffer.wrap(bytes2));
+ assertEquals(0, decoded.remaining());
+ assertTrue(decoder.isFinished());
+ decoder.release(decoded);
+ }
+
+ @Test
+ public void testSmallBlockWithGZIPTrailerChunked() throws Exception
+ {
+ String data = "0";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ // The trailer is 4+4 bytes, chunk the last 3 bytes
+ byte[] bytes1 = new byte[bytes.length - 3];
+ System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
+ byte[] bytes2 = new byte[bytes.length - bytes1.length];
+ System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
+
+ GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
+ ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
+ assertEquals(0, decoded.capacity());
+ decoder.release(decoded);
+ decoded = decoder.decode(ByteBuffer.wrap(bytes2));
+ assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
+ decoder.release(decoded);
+ }
+
+ @Test
+ public void testTwoSmallBlocks() throws Exception
+ {
+ String data1 = "0";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data1.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes1 = baos.toByteArray();
+
+ String data2 = "1";
+ baos = new ByteArrayOutputStream();
+ output = new GZIPOutputStream(baos);
+ output.write(data2.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes2 = baos.toByteArray();
+
+ byte[] bytes = new byte[bytes1.length + bytes2.length];
+ System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
+ System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
+
+ GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ ByteBuffer decoded = decoder.decode(buffer);
+ assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString());
+ assertTrue(decoder.isFinished());
+ assertTrue(buffer.hasRemaining());
+ decoder.release(decoded);
+ decoded = decoder.decode(buffer);
+ assertEquals(data2, StandardCharsets.UTF_8.decode(decoded).toString());
+ assertTrue(decoder.isFinished());
+ assertFalse(buffer.hasRemaining());
+ decoder.release(decoded);
+ }
+
+ @Test
+ public void testBigBlock() throws Exception
+ {
+ String data = "0123456789ABCDEF";
+ for (int i = 0; i < 10; ++i)
+ data += data;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ String result = "";
+ GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ while (buffer.hasRemaining())
+ {
+ ByteBuffer decoded = decoder.decode(buffer);
+ result += StandardCharsets.UTF_8.decode(decoded).toString();
+ decoder.release(decoded);
+ }
+ assertEquals(data, result);
+ }
+
+ @Test
+ public void testBigBlockOneByteAtATime() throws Exception
+ {
+ String data = "0123456789ABCDEF";
+ for (int i = 0; i < 10; ++i)
+ data += data;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ String result = "";
+ GZIPContentDecoder decoder = new GZIPContentDecoder(64);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ while (buffer.hasRemaining())
+ {
+ ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(new byte[]{buffer.get()}));
+ if (decoded.hasRemaining())
+ result += StandardCharsets.UTF_8.decode(decoded).toString();
+ decoder.release(decoded);
+ }
+ assertEquals(data, result);
+ assertTrue(decoder.isFinished());
+ }
+
+ @Test
+ public void testBigBlockWithExtraBytes() throws Exception
+ {
+ String data1 = "0123456789ABCDEF";
+ for (int i = 0; i < 10; ++i)
+ data1 += data1;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data1.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes1 = baos.toByteArray();
+
+ String data2 = "HELLO";
+ byte[] bytes2 = data2.getBytes(StandardCharsets.UTF_8);
+
+ byte[] bytes = new byte[bytes1.length + bytes2.length];
+ System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
+ System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
+
+ String result = "";
+ GZIPContentDecoder decoder = new GZIPContentDecoder(64);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ while (buffer.hasRemaining())
+ {
+ ByteBuffer decoded = decoder.decode(buffer);
+ if (decoded.hasRemaining())
+ result += StandardCharsets.UTF_8.decode(decoded).toString();
+ decoder.release(decoded);
+ if (decoder.isFinished())
+ break;
+ }
+ assertEquals(data1, result);
+ assertTrue(buffer.hasRemaining());
+ assertEquals(data2, StandardCharsets.UTF_8.decode(buffer).toString());
+ }
+}
diff --git a/jetty-server/src/main/config/etc/jetty-gzip.xml b/jetty-server/src/main/config/etc/jetty-gzip.xml
index 93b41fd6396..6e5573449e7 100644
--- a/jetty-server/src/main/config/etc/jetty-gzip.xml
+++ b/jetty-server/src/main/config/etc/jetty-gzip.xml
@@ -15,6 +15,8 @@
+
diff --git a/jetty-server/src/main/config/modules/gzip.mod b/jetty-server/src/main/config/modules/gzip.mod
index 65663a16066..d0173efa60f 100644
--- a/jetty-server/src/main/config/modules/gzip.mod
+++ b/jetty-server/src/main/config/modules/gzip.mod
@@ -20,3 +20,6 @@ etc/jetty-gzip.xml
## User agents for which gzip is disabled
# jetty.gzip.excludedUserAgent=.*MSIE.6\.0.*
+
+## Inflate request buffer size, or 0 for no request inflation
+# jetty.gzip.inflateBufferSize=0
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
index ffc727f060e..c090daab2d0 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
@@ -25,6 +25,7 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -39,23 +40,75 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
*
- * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
- * maintains two states: the content state that tells whether there is content to consume and the EOF
- * state that tells whether an EOF has arrived.
- * Only once the content has been consumed the content state is moved to the EOF state.
+ * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class maintains two states: the content state that tells
+ * whether there is content to consume and the EOF state that tells whether an EOF has arrived. Only once the content has been consumed the content state is
+ * moved to the EOF state.
+ */
+/**
+ * @author gregw
+ *
*/
public class HttpInput extends ServletInputStream implements Runnable
{
+ /**
+ * An interceptor for HTTP Request input.
+ *
+ * Unlike inputstream wrappers that can be applied by filters, an interceptor
+ * is entirely transparent and works with async IO APIs.
+ * @see HttpInput#setInterceptor(Interceptor)
+ * @see HttpInput#addInterceptor(Interceptor)
+ */
+ public interface Interceptor
+ {
+ Content readFrom(Content content);
+ }
+
+ /**
+ * An {@link Interceptor} that chains two other {@link Interceptor}s together.
+ * The {@link #readFrom(Content)} calls the previous {@link Interceptor}'s
+ * {@link #readFrom(Content)} and then passes any {@link Content} returned
+ * to the next {@link Interceptor}.
+ */
+ public static class ChainedInterceptor implements Interceptor
+ {
+ private final Interceptor _prev;
+ private final Interceptor _next;
+
+ public ChainedInterceptor(Interceptor prev, Interceptor next)
+ {
+ _prev = prev;
+ _next = next;
+ }
+
+ public Interceptor getPrev()
+ {
+ return _prev;
+ }
+
+ public Interceptor getNext()
+ {
+ return _next;
+ }
+
+ @Override
+ public Content readFrom(Content content)
+ {
+ return _next.readFrom(_prev.readFrom(content));
+ }
+ }
+
+
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final static Content EOF_CONTENT = new EofContent("EOF");
private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
private final byte[] _oneByteBuffer = new byte[1];
+ private Content _content;
+ private Content _intercepted;
private final Deque _inputQ = new ArrayDeque<>();
private final HttpChannelState _channelState;
private ReadListener _listener;
@@ -64,6 +117,7 @@ public class HttpInput extends ServletInputStream implements Runnable
private long _contentArrived;
private long _contentConsumed;
private long _blockUntil;
+ private Interceptor _interceptor;
public HttpInput(HttpChannelState state)
{
@@ -79,6 +133,9 @@ public class HttpInput extends ServletInputStream implements Runnable
{
synchronized (_inputQ)
{
+ if (_content!=null)
+ _content.failed(null);
+ _content = null;
Content item = _inputQ.poll();
while (item != null)
{
@@ -91,9 +148,40 @@ public class HttpInput extends ServletInputStream implements Runnable
_contentConsumed = 0;
_firstByteTimeStamp = -1;
_blockUntil = 0;
+ _interceptor = null;
}
}
+ /**
+ * @return The current Interceptor, or null if none set
+ */
+ public Interceptor getInterceptor()
+ {
+ return _interceptor;
+ }
+
+ /**
+ * Set the interceptor.
+ * @param interceptor The interceptor to use.
+ */
+ public void setInterceptor(Interceptor interceptor)
+ {
+ _interceptor = interceptor;
+ }
+
+ /**
+ * Set the {@link Interceptor}, using a {@link ChainedInterceptor} if
+ * an {@link Interceptor} is already set.
+ * @param interceptor the next {@link Interceptor} in a chain
+ */
+ public void addInterceptor(Interceptor interceptor)
+ {
+ if (_interceptor == null)
+ _interceptor = interceptor;
+ else
+ _interceptor = new ChainedInterceptor(_interceptor,interceptor);
+ }
+
@Override
public int available()
{
@@ -101,8 +189,9 @@ public class HttpInput extends ServletInputStream implements Runnable
boolean woken = false;
synchronized (_inputQ)
{
- Content content = _inputQ.peek();
- if (content == null)
+ if (_content == null)
+ _content = _inputQ.poll();
+ if (_content == null)
{
try
{
@@ -112,11 +201,12 @@ public class HttpInput extends ServletInputStream implements Runnable
{
woken = failed(e);
}
- content = _inputQ.peek();
+ if (_content == null)
+ _content = _inputQ.poll();
}
- if (content != null)
- available = remaining(content);
+ if (_content != null)
+ available = _content.remaining();
}
if (woken)
@@ -139,10 +229,10 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public int read() throws IOException
{
- int read = read(_oneByteBuffer, 0, 1);
+ int read = read(_oneByteBuffer,0,1);
if (read == 0)
throw new IllegalStateException("unready read=0");
- return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
+ return read < 0?-1:_oneByteBuffer[0] & 0xFF;
}
@Override
@@ -168,7 +258,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentArrived < minimum_data)
- throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, String.format("Request data rate < %d B/s", minRequestDataRate));
+ throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate));
}
}
@@ -177,11 +267,12 @@ public class HttpInput extends ServletInputStream implements Runnable
Content item = nextContent();
if (item != null)
{
- int l = get(item, b, off, len);
+ int l = get(item,b,off,len);
if (LOG.isDebugEnabled())
- LOG.debug("{} read {} from {}", this, l, item);
+ LOG.debug("{} read {} from {}",this,l,item);
- consumeNonContent();
+ // Consume any following poison pills
+ pollReadableContent();
return l;
}
@@ -193,191 +284,207 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
- * Called when derived implementations should attempt to
- * produce more Content and add it via {@link #addContent(Content)}.
- * For protocols that are constantly producing (eg HTTP2) this can
- * be left as a noop;
+ * Called when derived implementations should attempt to produce more Content and add it via {@link #addContent(Content)}. For protocols that are constantly
+ * producing (eg HTTP2) this can be left as a noop;
*
- * @throws IOException if unable to produce content
+ * @throws IOException
+ * if unable to produce content
*/
protected void produceContent() throws IOException
{
}
/**
- * Get the next content from the inputQ, calling {@link #produceContent()}
- * if need be. EOF is processed and state changed.
+ * Get the next content from the inputQ, calling {@link #produceContent()} if need be. EOF is processed and state changed.
*
* @return the content or null if none available.
- * @throws IOException if retrieving the content fails
+ * @throws IOException
+ * if retrieving the content fails
*/
protected Content nextContent() throws IOException
{
- Content content = pollContent();
+ Content content = pollNonEmptyContent();
if (content == null && !isFinished())
{
produceContent();
- content = pollContent();
+ content = pollNonEmptyContent();
}
return content;
}
/**
- * Poll the inputQ for Content.
- * Consumed buffers and {@link PoisonPillContent}s are removed and
- * EOF state updated if need be.
+ * Poll the inputQ for Content. Consumed buffers and {@link SentinelContent}s are removed and EOF state updated if need be.
*
* @return Content or null
*/
- protected Content pollContent()
+ protected Content pollNonEmptyContent()
{
- // Items are removed only when they are fully consumed.
- Content content = _inputQ.peek();
- // Skip consumed items at the head of the queue.
- while (content != null && remaining(content) == 0)
- {
- _inputQ.poll();
- content.succeeded();
- if (LOG.isDebugEnabled())
- LOG.debug("{} consumed {}", this, content);
-
- if (content == EOF_CONTENT)
+ while (true)
+ {
+ // Get the next content (or EOF)
+ Content content = pollReadableContent();
+
+ // If it is EOF, consume it here
+ if (content instanceof SentinelContent)
{
- if (_listener == null)
- _state = EOF;
- else
+ if (content == EARLY_EOF_CONTENT)
+ _state = EARLY_EOF;
+ else if (content instanceof EofContent)
{
- _state = AEOF;
- boolean woken = _channelState.onReadReady(); // force callback?
- if (woken)
- wake();
+ if (_listener == null)
+ _state = EOF;
+ else
+ {
+ _state = AEOF;
+ boolean woken = _channelState.onReadReady(); // force callback?
+ if (woken)
+ wake();
+ }
}
+
+ // Consume the EOF content, either if it was original content
+ // or if it was produced by interception
+ content.succeeded();
+ if (_content==content)
+ _content = null;
+ else if (_intercepted==content)
+ _intercepted = null;
+ continue;
}
- else if (content == EARLY_EOF_CONTENT)
- _state = EARLY_EOF;
- content = _inputQ.peek();
+ return content;
}
-
- return content;
+
}
/**
+ * Poll the inputQ for Content or EOF. Consumed buffers and non EOF {@link SentinelContent}s are removed. EOF state is not updated.
+ * Interception is done within this method.
+ * @return Content with remaining, a {@link SentinelContent}, or null
*/
- protected void consumeNonContent()
+ protected Content pollReadableContent()
{
- // Items are removed only when they are fully consumed.
- Content content = _inputQ.peek();
- // Skip consumed items at the head of the queue.
- while (content != null && remaining(content) == 0)
+ // If we have a chunk produced by interception
+ if (_intercepted!=null)
{
- // Defer EOF until read
- if (content instanceof EofContent)
- break;
-
- // Consume all other empty content
- _inputQ.poll();
- content.succeeded();
- if (LOG.isDebugEnabled())
- LOG.debug("{} consumed {}", this, content);
- content = _inputQ.peek();
+ // Use it if it has any remaining content
+ if (_intercepted.hasContent())
+ return _intercepted;
+
+ // succeed the chunk
+ _intercepted.succeeded();
+ _intercepted=null;
}
+
+ // If we don't have a Content under consideration, get
+ // the next one off the input Q.
+ if (_content == null)
+ _content = _inputQ.poll();
+
+ // While we have content to consider.
+ while (_content!=null)
+ {
+ // Are we intercepting?
+ if (_interceptor!=null)
+ {
+ // Intercept the current content (may be called several
+ // times for the same content
+ _intercepted = _interceptor.readFrom(_content);
+
+ // If interception produced new content
+ if (_intercepted!=null && _intercepted!=_content)
+ {
+ // if it is not empty use it
+ if (_intercepted.hasContent())
+ return _intercepted;
+ _intercepted.succeeded();
+ }
+
+ // intercepted content consumed
+ _intercepted=null;
+
+ // fall through so that the unintercepted _content is
+ // considered for any remaining content, for EOF and to
+ // succeed it if it is entirely consumed.
+ }
+
+ // If the content has content or is an EOF marker, use it
+ if (_content.hasContent() || _content instanceof SentinelContent)
+ return _content;
+
+ // The content is consumed, so get the next one. Note that EOF
+ // content is never consumed here, but in #pollContent
+ _content.succeeded();
+ _content = _inputQ.poll();
+ }
+
+ return null;
+
}
+
/**
- * Get the next readable from the inputQ, calling {@link #produceContent()}
- * if need be. EOF is NOT processed and state is not changed.
+ * Get the next readable from the inputQ, calling {@link #produceContent()} if need be. EOF is NOT processed and state is not changed.
*
* @return the content or EOF or null if none available.
- * @throws IOException if retrieving the content fails
+ * @throws IOException
+ * if retrieving the content fails
*/
protected Content nextReadable() throws IOException
{
- Content content = pollReadable();
+ Content content = pollReadableContent();
if (content == null && !isFinished())
{
produceContent();
- content = pollReadable();
+ content = pollReadableContent();
}
return content;
}
- /**
- * Poll the inputQ for Content or EOF.
- * Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
- * EOF state is not updated.
- *
- * @return Content, EOF or null
- */
- protected Content pollReadable()
- {
- // Items are removed only when they are fully consumed.
- Content content = _inputQ.peek();
-
- // Skip consumed items at the head of the queue except EOF
- while (content != null)
- {
- if (content == EOF_CONTENT || content == EARLY_EOF_CONTENT || remaining(content) > 0)
- return content;
-
- _inputQ.poll();
- content.succeeded();
- if (LOG.isDebugEnabled())
- LOG.debug("{} consumed {}", this, content);
- content = _inputQ.peek();
- }
-
- return null;
- }
-
- /**
- * @param item the content
- * @return how many bytes remain in the given content
- */
- protected int remaining(Content item)
- {
- return item.remaining();
- }
/**
* Copies the given content into the given byte buffer.
*
- * @param content the content to copy from
- * @param buffer the buffer to copy into
- * @param offset the buffer offset to start copying from
- * @param length the space available in the buffer
+ * @param content
+ * the content to copy from
+ * @param buffer
+ * the buffer to copy into
+ * @param offset
+ * the buffer offset to start copying from
+ * @param length
+ * the space available in the buffer
* @return the number of bytes actually copied
*/
protected int get(Content content, byte[] buffer, int offset, int length)
{
- int l = Math.min(content.remaining(), length);
- content.getContent().get(buffer, offset, l);
+ int l = content.get(buffer,offset,length);
_contentConsumed += l;
return l;
}
/**
- * Consumes the given content.
- * Calls the content succeeded if all content consumed.
+ * Consumes the given content. Calls the content succeeded if all content consumed.
*
- * @param content the content to consume
- * @param length the number of bytes to consume
+ * @param content
+ * the content to consume
+ * @param length
+ * the number of bytes to consume
*/
protected void skip(Content content, int length)
{
- int l = Math.min(content.remaining(), length);
- ByteBuffer buffer = content.getContent();
- buffer.position(buffer.position() + l);
+ int l = content.skip(length);
+
_contentConsumed += l;
- if (l > 0 && !content.hasContent())
- pollContent(); // hungry succeed
+ if (l > 0 && content.isEmpty())
+ pollNonEmptyContent(); // hungry succeed
}
/**
* Blocks until some content or some end-of-file event arrives.
*
- * @throws IOException if the wait is interrupted
+ * @throws IOException
+ * if the wait is interrupted
*/
protected void blockForContent() throws IOException
{
@@ -392,7 +499,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
if (LOG.isDebugEnabled())
- LOG.debug("{} blocking for content timeout={}", this, timeout);
+ LOG.debug("{} blocking for content timeout={}",this,timeout);
if (timeout > 0)
_inputQ.wait(timeout);
else
@@ -402,7 +509,7 @@ public class HttpInput extends ServletInputStream implements Runnable
// TODO: so spurious wakeups are not handled correctly.
if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0)
- throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout()));
+ throw new TimeoutException(String.format("Blocking timeout %d ms",getBlockingTimeout()));
}
catch (Throwable e)
{
@@ -412,11 +519,12 @@ public class HttpInput extends ServletInputStream implements Runnable
/**
* Adds some content to the start of this input stream.
- * Typically used to push back content that has
- * been read, perhaps mutated. The bytes prepended are
- * deducted for the contentConsumed total
+ *
+ * Typically used to push back content that has been read, perhaps mutated. The bytes prepended are deducted for the contentConsumed total
+ *
*
- * @param item the content to add
+ * @param item
+ * the content to add
* @return true if content channel woken for read
*/
public boolean prependContent(Content item)
@@ -424,44 +532,54 @@ public class HttpInput extends ServletInputStream implements Runnable
boolean woken = false;
synchronized (_inputQ)
{
- _inputQ.push(item);
+ if (_content != null)
+ _inputQ.push(_content);
+ _content = item;
_contentConsumed -= item.remaining();
if (LOG.isDebugEnabled())
- LOG.debug("{} prependContent {}", this, item);
+ LOG.debug("{} prependContent {}",this,item);
if (_listener == null)
_inputQ.notify();
else
woken = _channelState.onReadPossible();
}
-
return woken;
}
/**
* Adds some content to this input stream.
*
- * @param item the content to add
+ * @param content
+ * the content to add
* @return true if content channel woken for read
*/
- public boolean addContent(Content item)
+ public boolean addContent(Content content)
{
boolean woken = false;
synchronized (_inputQ)
{
if (_firstByteTimeStamp == -1)
_firstByteTimeStamp = System.nanoTime();
- _contentArrived += item.remaining();
- _inputQ.offer(item);
- if (LOG.isDebugEnabled())
- LOG.debug("{} addContent {}", this, item);
- if (_listener == null)
- _inputQ.notify();
+ _contentArrived += content.remaining();
+
+ if (_content==null && _inputQ.isEmpty())
+ _content=content;
else
- woken = _channelState.onReadPossible();
- }
+ _inputQ.offer(content);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} addContent {}",this,content);
+ if (pollReadableContent()!=null)
+ {
+ if (_listener == null)
+ _inputQ.notify();
+ else
+ woken = _channelState.onReadPossible();
+ }
+ }
return woken;
}
@@ -469,7 +587,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
synchronized (_inputQ)
{
- return _inputQ.size() > 0;
+ return _content!=null || _inputQ.size() > 0;
}
}
@@ -490,11 +608,9 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
- * This method should be called to signal that an EOF has been
- * detected before all the expected content arrived.
+ * This method should be called to signal that an EOF has been detected before all the expected content arrived.
*
- * Typically this will result in an EOFException being thrown
- * from a subsequent read rather than a -1 return.
+ * Typically this will result in an EOFException being thrown from a subsequent read rather than a -1 return.
*
* @return true if content channel woken for read
*/
@@ -504,8 +620,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
- * This method should be called to signal that all the expected
- * content arrived.
+ * This method should be called to signal that all the expected content arrived.
*
* @return true if content channel woken for read
*/
@@ -526,7 +641,7 @@ public class HttpInput extends ServletInputStream implements Runnable
if (item == null)
break; // Let's not bother blocking
- skip(item, remaining(item));
+ skip(item,item.remaining());
}
return isFinished() && !isError();
}
@@ -641,11 +756,8 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/*
- *
- * While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
- * runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)}
- * to setup classloaders etc.
- *
+ * While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
+ * ContextHandler#handle(Runnable)} to setup classloaders etc.
*/
@Override
public void run()
@@ -666,7 +778,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
listener = _listener;
- error = _state instanceof ErrorState ? ((ErrorState)_state).getError() : null;
+ error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
}
try
@@ -721,19 +833,24 @@ public class HttpInput extends ServletInputStream implements Runnable
content = _inputQ.peekFirst();
}
return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]",
- getClass().getSimpleName(),
- hashCode(),
- consumed,
- q,
- content,
- state);
+ getClass().getSimpleName(),
+ hashCode(),
+ consumed,
+ q,
+ content,
+ state);
}
- public static class PoisonPillContent extends Content
+ /**
+ * A Sentinel Content, which has zero length content but
+ * indicates some other event in the input stream (eg EOF)
+ *
+ */
+ public static class SentinelContent extends Content
{
private final String _name;
- public PoisonPillContent(String name)
+ public SentinelContent(String name)
{
super(BufferUtil.EMPTY_BUFFER);
_name = name;
@@ -746,7 +863,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
}
- public static class EofContent extends PoisonPillContent
+ public static class EofContent extends SentinelContent
{
EofContent(String name)
{
@@ -756,22 +873,36 @@ public class HttpInput extends ServletInputStream implements Runnable
public static class Content implements Callback
{
- private final ByteBuffer _content;
+ protected final ByteBuffer _content;
public Content(ByteBuffer content)
{
_content = content;
}
+ public ByteBuffer getByteBuffer()
+ {
+ return _content;
+ }
+
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
- public ByteBuffer getContent()
+ public int get(byte[] buffer, int offset, int length)
{
- return _content;
+ length = Math.min(_content.remaining(),length);
+ _content.get(buffer,offset,length);
+ return length;
+ }
+
+ public int skip(int length)
+ {
+ length = Math.min(_content.remaining(),length);
+ _content.position(_content.position() + length);
+ return length;
}
public boolean hasContent()
@@ -783,15 +914,19 @@ public class HttpInput extends ServletInputStream implements Runnable
{
return _content.remaining();
}
+
+ public boolean isEmpty()
+ {
+ return !_content.hasRemaining();
+ }
@Override
public String toString()
{
- return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content));
+ return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
}
}
-
protected static abstract class State
{
public boolean blockForContent(HttpInput in) throws IOException
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
index c9a1354c73f..9d1ec89924b 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
@@ -175,9 +175,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _interceptor;
}
- public void setInterceptor(Interceptor filter)
+ public void setInterceptor(Interceptor interceptor)
{
- _interceptor = filter;
+ _interceptor = interceptor;
}
public boolean isWritten()
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java
index 1206f1a6c06..3d5ff8df02d 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java
@@ -65,6 +65,7 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
private int _compressionLevel=Deflater.DEFAULT_COMPRESSION;
private boolean _checkGzExists = true;
private boolean _syncFlush = false;
+ private int _inflateBufferSize = -1;
// non-static, as other GzipHandler instances may have different configurations
private final ThreadLocal _deflater = new ThreadLocal<>();
@@ -77,6 +78,7 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
private HttpField _vary;
+
/* ------------------------------------------------------------ */
/**
* Instantiates a new gzip handler.
@@ -398,6 +400,24 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
return _vary;
}
+ /* ------------------------------------------------------------ */
+ /**
+ * @return size in bytes of the buffer to inflate compressed request, or 0 for no inflation.
+ */
+ public int getInflateBufferSize()
+ {
+ return _inflateBufferSize;
+ }
+
+ /* ------------------------------------------------------------ */
+ /**
+ * @param size size in bytes of the buffer to inflate compressed request, or 0 for no inflation.
+ */
+ public void setInflateBufferSize(int size)
+ {
+ _inflateBufferSize = size;
+ }
+
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.server.handler.HandlerWrapper#handle(java.lang.String, org.eclipse.jetty.server.Request, javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
@@ -409,6 +429,19 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
String path = context==null?baseRequest.getRequestURI():URIUtil.addPaths(baseRequest.getServletPath(),baseRequest.getPathInfo());
LOG.debug("{} handle {} in {}",this,baseRequest,context);
+ // Handle request inflation
+ if (_inflateBufferSize>0)
+ {
+ HttpField ce = baseRequest.getHttpFields().getField(HttpHeader.CONTENT_ENCODING);
+ if (ce!=null && "gzip".equalsIgnoreCase(ce.getValue()))
+ {
+ // TODO should check ce.contains and then remove just the gzip encoding
+ baseRequest.getHttpFields().remove(HttpHeader.CONTENT_ENCODING);
+ baseRequest.getHttpFields().add(new HttpField("X-Content-Encoding",ce.getValue()));
+ baseRequest.getHttpInput().addInterceptor(new GzipHttpInputInterceptor(baseRequest.getHttpChannel().getByteBufferPool(),_inflateBufferSize));
+ }
+ }
+
HttpOutput out = baseRequest.getResponse().getHttpOutput();
// Are we already being gzipped?
HttpOutput.Interceptor interceptor = out.getInterceptor();
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpInputInterceptor.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpInputInterceptor.java
new file mode 100644
index 00000000000..97198e736f6
--- /dev/null
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpInputInterceptor.java
@@ -0,0 +1,83 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.server.handler.gzip;
+
+import java.nio.ByteBuffer;
+
+import org.eclipse.jetty.http.GZIPContentDecoder;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.server.HttpInput;
+import org.eclipse.jetty.server.HttpInput.Content;
+
+/**
+ * A HttpInput Interceptor that inflates GZIP encoded request content.
+ *
+ */
+public class GzipHttpInputInterceptor implements HttpInput.Interceptor
+{
+ class Decoder extends GZIPContentDecoder
+ {
+ public Decoder(ByteBufferPool pool, int bufferSize)
+ {
+ super(pool,bufferSize);
+ }
+
+ @Override
+ protected boolean decodedChunk(final ByteBuffer chunk)
+ {
+ _chunk = chunk;
+ return true;
+ }
+
+ @Override
+ public void decodeChunks(ByteBuffer compressed)
+ {
+ _chunk = null;
+ super.decodeChunks(compressed);
+ }
+ }
+
+ private final Decoder _decoder;
+ private ByteBuffer _chunk;
+
+ public GzipHttpInputInterceptor(ByteBufferPool pool, int bufferSize)
+ {
+ _decoder = new Decoder(pool,bufferSize);
+ }
+
+ @Override
+ public Content readFrom(Content content)
+ {
+ _decoder.decodeChunks(content.getByteBuffer());
+ final ByteBuffer chunk = _chunk;
+
+ if (chunk==null)
+ return null;
+
+ return new Content(chunk)
+ {
+ @Override
+ public void succeeded()
+ {
+ _decoder.release(chunk);
+ }
+ };
+ }
+
+}
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java
index 985d8c01bed..820e7ebf0c2 100644
--- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java
@@ -354,6 +354,7 @@ public class AsyncRequestReadTest
for (int i=read;i-->0;)
{
int c=in.read();
+ // System.err.println("in="+c);
if (c<0)
break;
out.write(c);
diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/GzipHandlerTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/GzipHandlerTest.java
index e892f389e5a..7b1ef878a5a 100644
--- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/GzipHandlerTest.java
+++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/GzipHandlerTest.java
@@ -33,8 +33,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -45,6 +47,7 @@ import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.hamcrest.Matchers;
import org.junit.After;
@@ -86,6 +89,7 @@ public class GzipHandlerTest
GzipHandler gzipHandler = new GzipHandler();
gzipHandler.setExcludedAgentPatterns();
gzipHandler.setMinGzipSize(16);
+ gzipHandler.setInflateBufferSize(4096);
ServletContextHandler context = new ServletContextHandler(gzipHandler,"/ctx");
ServletHandler servlets = context.getServletHandler();
@@ -97,6 +101,7 @@ public class GzipHandlerTest
servlets.addServletWithMapping(TestServlet.class,"/content");
servlets.addServletWithMapping(ForwardServlet.class,"/forward");
servlets.addServletWithMapping(IncludeServlet.class,"/include");
+ servlets.addServletWithMapping(EchoServlet.class,"/echo/*");
_server.start();
}
@@ -147,6 +152,21 @@ public class GzipHandlerTest
}
}
}
+
+ public static class EchoServlet extends HttpServlet
+ {
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException
+ {
+ response.setContentType(req.getContentType());
+ IO.copy(req.getInputStream(),response.getOutputStream());
+ }
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException
+ {
+ doGet(req,response);
+ }
+ }
public static class ForwardServlet extends HttpServlet
{
@@ -392,4 +412,68 @@ public class GzipHandlerTest
assertThat("Included Paths.size", includedPaths.length, is(2));
assertThat("Included Paths", Arrays.asList(includedPaths), contains("/foo","^/bar.*$"));
}
+
+
+ @Test
+ public void testGzipRequest() throws Exception
+ {
+ String data = "Hello Nice World! ";
+ for (int i = 0; i < 10; ++i)
+ data += data;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data.getBytes(StandardCharsets.UTF_8));
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ // generated and parsed test
+ HttpTester.Request request = HttpTester.newRequest();
+ HttpTester.Response response;
+
+ request.setMethod("POST");
+ request.setURI("/ctx/echo");
+ request.setVersion("HTTP/1.0");
+ request.setHeader("Host","tester");
+ request.setHeader("Content-Type","text/plain");
+ request.setHeader("Content-Encoding","gzip");
+ request.setContent(bytes);
+
+ response = HttpTester.parseResponse(_connector.getResponse(request.generate()));
+
+ assertThat(response.getStatus(),is(200));
+ assertThat(response.getContent(),is(data));
+
+ }
+
+ @Test
+ public void testGzipBomb() throws Exception
+ {
+ byte[] data = new byte[512*1024];
+ Arrays.fill(data,(byte)'X');
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ output.write(data);
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ // generated and parsed test
+ HttpTester.Request request = HttpTester.newRequest();
+ HttpTester.Response response;
+
+ request.setMethod("POST");
+ request.setURI("/ctx/echo");
+ request.setVersion("HTTP/1.0");
+ request.setHeader("Host","tester");
+ request.setHeader("Content-Type","text/plain");
+ request.setHeader("Content-Encoding","gzip");
+ request.setContent(bytes);
+
+ response = HttpTester.parseResponse(_connector.getResponse(request.generate()));
+ // TODO need to test back pressure works
+
+ assertThat(response.getStatus(),is(200));
+ assertThat(response.getContentBytes().length,is(512*1024));
+ }
+
}
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java
index 61f38956422..1110eac8f64 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java
@@ -18,6 +18,7 @@
package org.eclipse.jetty.http.client;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
@@ -58,9 +59,12 @@ import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpChannel;
+import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
+import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
@@ -68,6 +72,8 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
+import static java.nio.ByteBuffer.wrap;
+import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@@ -1038,4 +1044,174 @@ public class AsyncIOServletTest extends AbstractTest
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
+
+
+ @Test
+ public void testAsyncIntercepted() throws Exception
+ {
+ start(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ System.err.println("Service "+request);
+
+ final HttpInput httpInput = ((Request)request).getHttpInput();
+ httpInput.addInterceptor(new HttpInput.Interceptor()
+ {
+ int state = 0;
+ Content saved;
+
+ @Override
+ public Content readFrom(Content content)
+ {
+ // System.err.printf("readFrom s=%d saved=%b %s%n",state,saved!=null,content);
+ switch(state)
+ {
+ case 0:
+ // null transform
+ if (content.isEmpty())
+ state++;
+ return null;
+
+ case 1:
+ {
+ // copy transform
+ if (content.isEmpty())
+ {
+ state++;
+ return content;
+ }
+ ByteBuffer copy = wrap(toArray(content.getByteBuffer()));
+ content.skip(copy.remaining());
+ return new Content(copy);
+ }
+
+ case 2:
+ // byte by byte
+ if (content.isEmpty())
+ {
+ state++;
+ return content;
+ }
+ byte[] b = new byte[1];
+ int l = content.get(b,0,1);
+ return new Content(wrap(b,0,l));
+
+ case 3:
+ {
+ // double vision
+ if (content.isEmpty())
+ {
+ if (saved==null)
+ {
+ state++;
+ return content;
+ }
+ Content copy = saved;
+ saved=null;
+ return copy;
+ }
+
+ byte[] data = toArray(content.getByteBuffer());
+ content.skip(data.length);
+ saved = new Content(wrap(data));
+ return new Content(wrap(data));
+ }
+
+ default:
+ return null;
+ }
+ }
+ });
+
+ AsyncContext asyncContext = request.startAsync();
+ ServletInputStream input = request.getInputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ input.setReadListener(new ReadListener()
+ {
+ @Override
+ public void onDataAvailable() throws IOException
+ {
+ while (input.isReady() && !input.isFinished())
+ {
+ int b = input.read();
+ if (b>0)
+ {
+ // System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b));
+ out.write(b);
+ }
+ else
+ onAllDataRead();
+ }
+ }
+
+ @Override
+ public void onAllDataRead() throws IOException
+ {
+ response.getOutputStream().write(out.toByteArray());
+ asyncContext.complete();
+ }
+
+ @Override
+ public void onError(Throwable x)
+ {
+ }
+ });
+ }
+ });
+
+ DeferredContentProvider contentProvider = new DeferredContentProvider();
+ CountDownLatch clientLatch = new CountDownLatch(1);
+
+ String expected =
+ "S0" +
+ "S1" +
+ "S2" +
+ "S3S3" +
+ "S4" +
+ "S5" +
+ "S6";
+
+ client.newRequest(newURI())
+ .method(HttpMethod.POST)
+ .path(servletPath)
+ .content(contentProvider)
+ .send(new BufferingResponseListener()
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ if (result.isSucceeded())
+ {
+ Response response = result.getResponse();
+ assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
+ assertThat(getContentAsString(), Matchers.equalTo(expected));
+ clientLatch.countDown();
+ }
+ }
+ });
+
+ contentProvider.offer(BufferUtil.toBuffer("S0"));
+ contentProvider.flush();
+ contentProvider.offer(BufferUtil.toBuffer("S1"));
+ contentProvider.flush();
+ contentProvider.offer(BufferUtil.toBuffer("S2"));
+ contentProvider.flush();
+ contentProvider.offer(BufferUtil.toBuffer("S3"));
+ contentProvider.flush();
+ contentProvider.offer(BufferUtil.toBuffer("S4"));
+ contentProvider.flush();
+ contentProvider.offer(BufferUtil.toBuffer("S5"));
+ contentProvider.flush();
+ contentProvider.offer(BufferUtil.toBuffer("S6"));
+ contentProvider.close();
+
+
+ Assert.assertTrue(clientLatch.await(10,TimeUnit.SECONDS));
+
+
+ }
+
}