From 98bb582d45238959c80eb820f9bfeca8c28e6dbf Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 5 Oct 2016 13:49:20 +1100 Subject: [PATCH 1/2] Jetty 9.4.x http interceptor #382 * Issue #382 Request compression Added identity HttpInput.Interceptor Moved GZIPContentDecoder to jetty-http Reworking interceptor and GZIPContentDecoder to avoid data copies Completed and tested GZIPContentDecoder Implemented GzipHttpInputInterceptor updated GzipHandler.java updated gzip module use common GZIP decoder Gzip Bomb handle read() after empty interception --- .../jetty/client/GZIPContentDecoder.java | 302 +---------- .../jetty/client/GZIPContentDecoderTest.java | 1 + jetty-http/pom.xml | 5 + .../jetty/http/GZIPContentDecoder.java | 416 +++++++++++++++ .../jetty/http/GZIPContentDecoderTest.java | 343 +++++++++++++ .../src/main/config/etc/jetty-gzip.xml | 2 + jetty-server/src/main/config/modules/gzip.mod | 3 + .../org/eclipse/jetty/server/HttpInput.java | 479 +++++++++++------- .../org/eclipse/jetty/server/HttpOutput.java | 4 +- .../server/handler/gzip/GzipHandler.java | 33 ++ .../gzip/GzipHttpInputInterceptor.java | 83 +++ .../jetty/server/AsyncRequestReadTest.java | 1 + .../jetty/servlet/GzipHandlerTest.java | 84 +++ .../jetty/http/client/AsyncIOServletTest.java | 176 +++++++ 14 files changed, 1459 insertions(+), 473 deletions(-) create mode 100644 jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java create mode 100644 jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java create mode 100644 jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpInputInterceptor.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java b/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java index 8bae3b8a993..c469d2e4719 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/GZIPContentDecoder.java @@ -18,26 +18,13 @@ package org.eclipse.jetty.client; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; -import java.util.zip.ZipException; - -import org.eclipse.jetty.util.BufferUtil; /** * {@link ContentDecoder} for the "gzip" encoding. + * */ -public class GZIPContentDecoder implements ContentDecoder +public class GZIPContentDecoder extends org.eclipse.jetty.http.GZIPContentDecoder implements ContentDecoder { - private final Inflater inflater = new Inflater(true); - private final byte[] bytes; - private byte[] output; - private State state; - private int size; - private int value; - private byte flags; public GZIPContentDecoder() { @@ -46,285 +33,7 @@ public class GZIPContentDecoder implements ContentDecoder public GZIPContentDecoder(int bufferSize) { - this.bytes = new byte[bufferSize]; - reset(); - } - - /** - * {@inheritDoc} - *

If the decoding did not produce any output, for example because it consumed gzip header - * or trailer bytes, it returns a buffer with zero capacity.

- *

This method never returns null.

- *

The given {@code buffer}'s position will be modified to reflect the bytes consumed during - * the decoding.

- *

The decoding may be finished without consuming the buffer completely if the buffer contains - * gzip bytes plus other bytes (either plain or gzipped).

- */ - @Override - public ByteBuffer decode(ByteBuffer buffer) - { - try - { - while (buffer.hasRemaining()) - { - byte currByte = buffer.get(); - switch (state) - { - case INITIAL: - { - buffer.position(buffer.position() - 1); - state = State.ID; - break; - } - case ID: - { - value += (currByte & 0xFF) << 8 * size; - ++size; - if (size == 2) - { - if (value != 0x8B1F) - throw new ZipException("Invalid gzip bytes"); - state = State.CM; - } - break; - } - case CM: - { - if ((currByte & 0xFF) != 0x08) - throw new ZipException("Invalid gzip compression method"); - state = State.FLG; - break; - } - case FLG: - { - flags = currByte; - state = State.MTIME; - size = 0; - value = 0; - break; - } - case MTIME: - { - // Skip the 4 MTIME bytes - ++size; - if (size == 4) - state = State.XFL; - break; - } - case XFL: - { - // Skip XFL - state = State.OS; - break; - } - case OS: - { - // Skip OS - state = State.FLAGS; - break; - } - case FLAGS: - { - buffer.position(buffer.position() - 1); - if ((flags & 0x04) == 0x04) - { - state = State.EXTRA_LENGTH; - size = 0; - value = 0; - } - else if ((flags & 0x08) == 0x08) - state = State.NAME; - else if ((flags & 0x10) == 0x10) - state = State.COMMENT; - else if ((flags & 0x2) == 0x2) - { - state = State.HCRC; - size = 0; - value = 0; - } - else - state = State.DATA; - break; - } - case EXTRA_LENGTH: - { - value += (currByte & 0xFF) << 8 * size; - ++size; - if (size == 2) - state = State.EXTRA; - break; - } - case EXTRA: - { - // Skip EXTRA bytes - --value; - if (value == 0) - { - // Clear the EXTRA flag and loop on the flags - flags &= ~0x04; - state = State.FLAGS; - } - break; - } - case NAME: - { - // Skip NAME bytes - if (currByte == 0) - { - // Clear the NAME flag and loop on the flags - flags &= ~0x08; - state = State.FLAGS; - } - break; - } - case COMMENT: - { - // Skip COMMENT bytes - if (currByte == 0) - { - // Clear the COMMENT flag and loop on the flags - flags &= ~0x10; - state = State.FLAGS; - } - break; - } - case HCRC: - { - // Skip HCRC - ++size; - if (size == 2) - { - // Clear the HCRC flag and loop on the flags - flags &= ~0x02; - state = State.FLAGS; - } - break; - } - case DATA: - { - buffer.position(buffer.position() - 1); - while (true) - { - int decoded = inflate(bytes); - if (decoded == 0) - { - if (inflater.needsInput()) - { - if (buffer.hasRemaining()) - { - byte[] input = new byte[buffer.remaining()]; - buffer.get(input); - inflater.setInput(input); - } - else - { - if (output != null) - { - ByteBuffer result = ByteBuffer.wrap(output); - output = null; - return result; - } - break; - } - } - else if (inflater.finished()) - { - int remaining = inflater.getRemaining(); - buffer.position(buffer.limit() - remaining); - state = State.CRC; - size = 0; - value = 0; - break; - } - else - { - throw new ZipException("Invalid inflater state"); - } - } - else - { - if (output == null) - { - // Save the inflated bytes and loop to see if we have finished - output = Arrays.copyOf(bytes, decoded); - } - else - { - // Accumulate inflated bytes and loop to see if we have finished - byte[] newOutput = Arrays.copyOf(output, output.length + decoded); - System.arraycopy(bytes, 0, newOutput, output.length, decoded); - output = newOutput; - } - } - } - break; - } - case CRC: - { - value += (currByte & 0xFF) << 8 * size; - ++size; - if (size == 4) - { - // From RFC 1952, compliant decoders need not to verify the CRC - state = State.ISIZE; - size = 0; - value = 0; - } - break; - } - case ISIZE: - { - value += (currByte & 0xFF) << 8 * size; - ++size; - if (size == 4) - { - if (value != inflater.getBytesWritten()) - throw new ZipException("Invalid input size"); - - ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output); - reset(); - return result; - } - break; - } - default: - throw new ZipException(); - } - } - return BufferUtil.EMPTY_BUFFER; - } - catch (ZipException x) - { - throw new RuntimeException(x); - } - } - - private int inflate(byte[] bytes) throws ZipException - { - try - { - return inflater.inflate(bytes); - } - catch (DataFormatException x) - { - throw new ZipException(x.getMessage()); - } - } - - private void reset() - { - inflater.reset(); - Arrays.fill(bytes, (byte)0); - output = null; - state = State.INITIAL; - size = 0; - value = 0; - flags = 0; - } - - protected boolean isFinished() - { - return state == State.INITIAL; + super(null,bufferSize); } /** @@ -351,9 +60,4 @@ public class GZIPContentDecoder implements ContentDecoder return new GZIPContentDecoder(bufferSize); } } - - private enum State - { - INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE - } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java index 9c1b460d02a..207f6d3e1fe 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/GZIPContentDecoderTest.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.toolchain.test.TestTracker; import org.junit.Rule; import org.junit.Test; +@Deprecated public class GZIPContentDecoderTest { @Rule diff --git a/jetty-http/pom.xml b/jetty-http/pom.xml index 8455a9d3248..9802a9f8a20 100644 --- a/jetty-http/pom.xml +++ b/jetty-http/pom.xml @@ -18,6 +18,11 @@ jetty-util ${project.version} + + org.eclipse.jetty + jetty-io + ${project.version} + org.eclipse.jetty.toolchain jetty-test-helper diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java b/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java new file mode 100644 index 00000000000..49f5aa5b746 --- /dev/null +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java @@ -0,0 +1,416 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http; + +import java.nio.ByteBuffer; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; +import java.util.zip.ZipException; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.BufferUtil; + +/** + * Decoder for the "gzip" encoding. + *

An Decode that inflates gzip compressed data that has been + * optimized for async usage with minimal data copies. + * + */ +public class GZIPContentDecoder +{ + private final Inflater _inflater = new Inflater(true); + private final ByteBufferPool _pool; + private final int _bufferSize; + private State _state; + private int _size; + private int _value; + private byte _flags; + private ByteBuffer _inflated; + + public GZIPContentDecoder() + { + this(null,2048); + } + + public GZIPContentDecoder(int bufferSize) + { + this(null,bufferSize); + } + + public GZIPContentDecoder(ByteBufferPool pool, int bufferSize) + { + _bufferSize = bufferSize; + _pool = pool; + reset(); + } + + /** Inflate compressed data from a buffer. + * + * @param compressed Buffer containing compressed data. + * @return Buffer containing inflated data. + */ + public ByteBuffer decode(ByteBuffer compressed) + { + decodeChunks(compressed); + if (BufferUtil.isEmpty(_inflated) || _state==State.CRC || _state==State.ISIZE ) + return BufferUtil.EMPTY_BUFFER; + + ByteBuffer result = _inflated; + _inflated = null; + return result; + } + + /** Called when a chunk of data is inflated. + *

The default implementation aggregates all the chunks + * into a single buffer returned from {@link #decode(ByteBuffer)}. + * Derived implementations may choose to consume chunks individually + * and return false to prevent further inflation until a subsequent + * call to {@link #decode(ByteBuffer)} or {@link #decodeChunks(ByteBuffer)}. + * + * @param chunk The inflated chunk of data + * @return False if inflating should continue, or True if the call + * to {@link #decodeChunks(ByteBuffer)} or {@link #decode(ByteBuffer)} + * should return, allowing back pressure of compressed data. + */ + protected boolean decodedChunk(ByteBuffer chunk) + { + if (_inflated==null) + _inflated=chunk; + else + { + int size = _inflated.remaining() + chunk.remaining(); + if (size<=_inflated.capacity()) + { + BufferUtil.append(_inflated,chunk); + BufferUtil.put(chunk,_inflated); + release(chunk); + } + else + { + ByteBuffer bigger=_pool==null?BufferUtil.allocate(size):_pool.acquire(size,false); + int pos=BufferUtil.flipToFill(bigger); + BufferUtil.put(_inflated,bigger); + BufferUtil.put(chunk,bigger); + BufferUtil.flipToFlush(bigger,pos); + release(_inflated); + release(chunk); + _inflated = bigger; + } + } + + return false; + } + + + /** + * Inflate compressed data. + *

Inflation continues until the compressed block end is reached, there is no + * more compressed data or a call to {@link #decodedChunk(ByteBuffer)} returns true. + * @param compressed Buffer of compressed data to inflate + */ + protected void decodeChunks(ByteBuffer compressed) + { + ByteBuffer buffer = null; + try + { + while (true) + { + switch (_state) + { + case INITIAL: + { + _state = State.ID; + break; + } + + case FLAGS: + { + if ((_flags & 0x04) == 0x04) + { + _state = State.EXTRA_LENGTH; + _size = 0; + _value = 0; + } + else if ((_flags & 0x08) == 0x08) + _state = State.NAME; + else if ((_flags & 0x10) == 0x10) + _state = State.COMMENT; + else if ((_flags & 0x2) == 0x2) + { + _state = State.HCRC; + _size = 0; + _value = 0; + } + else + { + _state = State.DATA; + continue; + } + break; + } + + case DATA: + { + while (true) + { + if (buffer==null) + buffer = acquire(); + + try + { + int length = _inflater.inflate(buffer.array(),buffer.arrayOffset(),buffer.capacity()); + buffer.limit(length); + } + catch (DataFormatException x) + { + throw new ZipException(x.getMessage()); + } + + if (buffer.hasRemaining()) + { + ByteBuffer chunk = buffer; + buffer = null; + if (decodedChunk(chunk)) + return; + } + else if (_inflater.needsInput()) + { + if (!compressed.hasRemaining()) + return; + if (compressed.hasArray()) + { + _inflater.setInput(compressed.array(),compressed.arrayOffset()+compressed.position(),compressed.remaining()); + compressed.position(compressed.limit()); + } + else + { + // TODO use the pool + byte[] input = new byte[compressed.remaining()]; + compressed.get(input); + _inflater.setInput(input); + } + } + else if (_inflater.finished()) + { + int remaining = _inflater.getRemaining(); + compressed.position(compressed.limit() - remaining); + _state = State.CRC; + _size = 0; + _value = 0; + break; + } + } + continue; + } + + default: + break; + } + + if (!compressed.hasRemaining()) + break; + + byte currByte = compressed.get(); + switch (_state) + { + case ID: + { + _value += (currByte & 0xFF) << 8 * _size; + ++_size; + if (_size == 2) + { + if (_value != 0x8B1F) + throw new ZipException("Invalid gzip bytes"); + _state = State.CM; + } + break; + } + case CM: + { + if ((currByte & 0xFF) != 0x08) + throw new ZipException("Invalid gzip compression method"); + _state = State.FLG; + break; + } + case FLG: + { + _flags = currByte; + _state = State.MTIME; + _size = 0; + _value = 0; + break; + } + case MTIME: + { + // Skip the 4 MTIME bytes + ++_size; + if (_size == 4) + _state = State.XFL; + break; + } + case XFL: + { + // Skip XFL + _state = State.OS; + break; + } + case OS: + { + // Skip OS + _state = State.FLAGS; + break; + } + case EXTRA_LENGTH: + { + _value += (currByte & 0xFF) << 8 * _size; + ++_size; + if (_size == 2) + _state = State.EXTRA; + break; + } + case EXTRA: + { + // Skip EXTRA bytes + --_value; + if (_value == 0) + { + // Clear the EXTRA flag and loop on the flags + _flags &= ~0x04; + _state = State.FLAGS; + } + break; + } + case NAME: + { + // Skip NAME bytes + if (currByte == 0) + { + // Clear the NAME flag and loop on the flags + _flags &= ~0x08; + _state = State.FLAGS; + } + break; + } + case COMMENT: + { + // Skip COMMENT bytes + if (currByte == 0) + { + // Clear the COMMENT flag and loop on the flags + _flags &= ~0x10; + _state = State.FLAGS; + } + break; + } + case HCRC: + { + // Skip HCRC + ++_size; + if (_size == 2) + { + // Clear the HCRC flag and loop on the flags + _flags &= ~0x02; + _state = State.FLAGS; + } + break; + } + case CRC: + { + _value += (currByte & 0xFF) << 8 * _size; + ++_size; + if (_size == 4) + { + // From RFC 1952, compliant decoders need not to verify the CRC + _state = State.ISIZE; + _size = 0; + _value = 0; + } + break; + } + case ISIZE: + { + _value += (currByte & 0xFF) << 8 * _size; + ++_size; + if (_size == 4) + { + if (_value != _inflater.getBytesWritten()) + throw new ZipException("Invalid input size"); + + // TODO ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output); + reset(); + return ; + } + break; + } + default: + throw new ZipException(); + } + } + } + catch (ZipException x) + { + throw new RuntimeException(x); + } + finally + { + if (buffer!=null) + release(buffer); + } + } + + private void reset() + { + _inflater.reset(); + _state = State.INITIAL; + _size = 0; + _value = 0; + _flags = 0; + } + + public boolean isFinished() + { + return _state == State.INITIAL; + } + + private enum State + { + INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE + } + + /** + * @return An indirect buffer of the configured buffersize either from the pool or freshly allocated. + */ + public ByteBuffer acquire() + { + return _pool==null?BufferUtil.allocate(_bufferSize):_pool.acquire(_bufferSize,false); + } + + /** + * Release an allocated buffer. + *

This method will called {@link ByteBufferPool#release(ByteBuffer)} if a buffer pool has + * been configured. This method should be called once for all buffers returned from {@link #decode(ByteBuffer)} + * or passed to {@link #decodedChunk(ByteBuffer)}. + * @param buffer The buffer to release. + */ + public void release(ByteBuffer buffer) + { + if (_pool!=null && buffer!=BufferUtil.EMPTY_BUFFER) + _pool.release(buffer); + } +} diff --git a/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java b/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java new file mode 100644 index 00000000000..2c2d7195baa --- /dev/null +++ b/jetty-http/src/test/java/org/eclipse/jetty/http/GZIPContentDecoderTest.java @@ -0,0 +1,343 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.toolchain.test.TestTracker; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + + +public class GZIPContentDecoderTest +{ + @Rule + public final TestTracker tracker = new TestTracker(); + + + ArrayByteBufferPool pool; + AtomicInteger buffers = new AtomicInteger(0); + + @Before + public void beforeClass() throws Exception + { + buffers.set(0); + pool = new ArrayByteBufferPool() + { + + @Override + public ByteBuffer acquire(int size, boolean direct) + { + buffers.incrementAndGet(); + return super.acquire(size,direct); + } + + @Override + public void release(ByteBuffer buffer) + { + buffers.decrementAndGet(); + super.release(buffer); + } + + }; + } + + @After + public void afterClass() throws Exception + { + assertEquals(0,buffers.get()); + } + + + @Test + public void testStreamNoBlocks() throws Exception + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.close(); + byte[] bytes = baos.toByteArray(); + + GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1); + int read = input.read(); + assertEquals(-1, read); + } + + @Test + public void testStreamBigBlockOneByteAtATime() throws Exception + { + String data = "0123456789ABCDEF"; + for (int i = 0; i < 10; ++i) + data += data; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes = baos.toByteArray(); + + baos = new ByteArrayOutputStream(); + GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1); + int read; + while ((read = input.read()) >= 0) + baos.write(read); + assertEquals(data, new String(baos.toByteArray(), StandardCharsets.UTF_8)); + } + + @Test + public void testNoBlocks() throws Exception + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.close(); + byte[] bytes = baos.toByteArray(); + + GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes)); + assertEquals(0, decoded.remaining()); + } + + @Test + public void testSmallBlock() throws Exception + { + String data = "0"; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes = baos.toByteArray(); + + GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes)); + assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); + decoder.release(decoded); + } + + @Test + public void testSmallBlockWithGZIPChunkedAtBegin() throws Exception + { + String data = "0"; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes = baos.toByteArray(); + + // The header is 10 bytes, chunk at 11 bytes + byte[] bytes1 = new byte[11]; + System.arraycopy(bytes, 0, bytes1, 0, bytes1.length); + byte[] bytes2 = new byte[bytes.length - bytes1.length]; + System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); + + GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); + assertEquals(0, decoded.capacity()); + decoded = decoder.decode(ByteBuffer.wrap(bytes2)); + assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); + decoder.release(decoded); + } + + @Test + public void testSmallBlockWithGZIPChunkedAtEnd() throws Exception + { + String data = "0"; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes = baos.toByteArray(); + + // The trailer is 8 bytes, chunk the last 9 bytes + byte[] bytes1 = new byte[bytes.length - 9]; + System.arraycopy(bytes, 0, bytes1, 0, bytes1.length); + byte[] bytes2 = new byte[bytes.length - bytes1.length]; + System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); + + GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); + assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); + assertFalse(decoder.isFinished()); + decoder.release(decoded); + decoded = decoder.decode(ByteBuffer.wrap(bytes2)); + assertEquals(0, decoded.remaining()); + assertTrue(decoder.isFinished()); + decoder.release(decoded); + } + + @Test + public void testSmallBlockWithGZIPTrailerChunked() throws Exception + { + String data = "0"; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes = baos.toByteArray(); + + // The trailer is 4+4 bytes, chunk the last 3 bytes + byte[] bytes1 = new byte[bytes.length - 3]; + System.arraycopy(bytes, 0, bytes1, 0, bytes1.length); + byte[] bytes2 = new byte[bytes.length - bytes1.length]; + System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length); + + GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1)); + assertEquals(0, decoded.capacity()); + decoder.release(decoded); + decoded = decoder.decode(ByteBuffer.wrap(bytes2)); + assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString()); + decoder.release(decoded); + } + + @Test + public void testTwoSmallBlocks() throws Exception + { + String data1 = "0"; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data1.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes1 = baos.toByteArray(); + + String data2 = "1"; + baos = new ByteArrayOutputStream(); + output = new GZIPOutputStream(baos); + output.write(data2.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes2 = baos.toByteArray(); + + byte[] bytes = new byte[bytes1.length + bytes2.length]; + System.arraycopy(bytes1, 0, bytes, 0, bytes1.length); + System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length); + + GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + ByteBuffer decoded = decoder.decode(buffer); + assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString()); + assertTrue(decoder.isFinished()); + assertTrue(buffer.hasRemaining()); + decoder.release(decoded); + decoded = decoder.decode(buffer); + assertEquals(data2, StandardCharsets.UTF_8.decode(decoded).toString()); + assertTrue(decoder.isFinished()); + assertFalse(buffer.hasRemaining()); + decoder.release(decoded); + } + + @Test + public void testBigBlock() throws Exception + { + String data = "0123456789ABCDEF"; + for (int i = 0; i < 10; ++i) + data += data; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes = baos.toByteArray(); + + String result = ""; + GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + while (buffer.hasRemaining()) + { + ByteBuffer decoded = decoder.decode(buffer); + result += StandardCharsets.UTF_8.decode(decoded).toString(); + decoder.release(decoded); + } + assertEquals(data, result); + } + + @Test + public void testBigBlockOneByteAtATime() throws Exception + { + String data = "0123456789ABCDEF"; + for (int i = 0; i < 10; ++i) + data += data; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes = baos.toByteArray(); + + String result = ""; + GZIPContentDecoder decoder = new GZIPContentDecoder(64); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + while (buffer.hasRemaining()) + { + ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(new byte[]{buffer.get()})); + if (decoded.hasRemaining()) + result += StandardCharsets.UTF_8.decode(decoded).toString(); + decoder.release(decoded); + } + assertEquals(data, result); + assertTrue(decoder.isFinished()); + } + + @Test + public void testBigBlockWithExtraBytes() throws Exception + { + String data1 = "0123456789ABCDEF"; + for (int i = 0; i < 10; ++i) + data1 += data1; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data1.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes1 = baos.toByteArray(); + + String data2 = "HELLO"; + byte[] bytes2 = data2.getBytes(StandardCharsets.UTF_8); + + byte[] bytes = new byte[bytes1.length + bytes2.length]; + System.arraycopy(bytes1, 0, bytes, 0, bytes1.length); + System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length); + + String result = ""; + GZIPContentDecoder decoder = new GZIPContentDecoder(64); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + while (buffer.hasRemaining()) + { + ByteBuffer decoded = decoder.decode(buffer); + if (decoded.hasRemaining()) + result += StandardCharsets.UTF_8.decode(decoded).toString(); + decoder.release(decoded); + if (decoder.isFinished()) + break; + } + assertEquals(data1, result); + assertTrue(buffer.hasRemaining()); + assertEquals(data2, StandardCharsets.UTF_8.decode(buffer).toString()); + } +} diff --git a/jetty-server/src/main/config/etc/jetty-gzip.xml b/jetty-server/src/main/config/etc/jetty-gzip.xml index 93b41fd6396..6e5573449e7 100644 --- a/jetty-server/src/main/config/etc/jetty-gzip.xml +++ b/jetty-server/src/main/config/etc/jetty-gzip.xml @@ -15,6 +15,8 @@ + diff --git a/jetty-server/src/main/config/modules/gzip.mod b/jetty-server/src/main/config/modules/gzip.mod index 65663a16066..d0173efa60f 100644 --- a/jetty-server/src/main/config/modules/gzip.mod +++ b/jetty-server/src/main/config/modules/gzip.mod @@ -20,3 +20,6 @@ etc/jetty-gzip.xml ## User agents for which gzip is disabled # jetty.gzip.excludedUserAgent=.*MSIE.6\.0.* + +## Inflate request buffer size, or 0 for no request inflation +# jetty.gzip.inflateBufferSize=0 diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index ffc727f060e..c090daab2d0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -25,6 +25,7 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.Objects; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -39,23 +40,75 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Invocable.InvocationType; /** * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}. *

- * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class - * maintains two states: the content state that tells whether there is content to consume and the EOF - * state that tells whether an EOF has arrived. - * Only once the content has been consumed the content state is moved to the EOF state. + * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class maintains two states: the content state that tells + * whether there is content to consume and the EOF state that tells whether an EOF has arrived. Only once the content has been consumed the content state is + * moved to the EOF state. + */ +/** + * @author gregw + * */ public class HttpInput extends ServletInputStream implements Runnable { + /** + * An interceptor for HTTP Request input. + *

+ * Unlike inputstream wrappers that can be applied by filters, an interceptor + * is entirely transparent and works with async IO APIs. + * @see HttpInput#setInterceptor(Interceptor) + * @see HttpInput#addInterceptor(Interceptor) + */ + public interface Interceptor + { + Content readFrom(Content content); + } + + /** + * An {@link Interceptor} that chains two other {@link Interceptor}s together. + * The {@link #readFrom(Content)} calls the previous {@link Interceptor}'s + * {@link #readFrom(Content)} and then passes any {@link Content} returned + * to the next {@link Interceptor}. + */ + public static class ChainedInterceptor implements Interceptor + { + private final Interceptor _prev; + private final Interceptor _next; + + public ChainedInterceptor(Interceptor prev, Interceptor next) + { + _prev = prev; + _next = next; + } + + public Interceptor getPrev() + { + return _prev; + } + + public Interceptor getNext() + { + return _next; + } + + @Override + public Content readFrom(Content content) + { + return _next.readFrom(_prev.readFrom(content)); + } + } + + private final static Logger LOG = Log.getLogger(HttpInput.class); private final static Content EOF_CONTENT = new EofContent("EOF"); private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF"); private final byte[] _oneByteBuffer = new byte[1]; + private Content _content; + private Content _intercepted; private final Deque _inputQ = new ArrayDeque<>(); private final HttpChannelState _channelState; private ReadListener _listener; @@ -64,6 +117,7 @@ public class HttpInput extends ServletInputStream implements Runnable private long _contentArrived; private long _contentConsumed; private long _blockUntil; + private Interceptor _interceptor; public HttpInput(HttpChannelState state) { @@ -79,6 +133,9 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { + if (_content!=null) + _content.failed(null); + _content = null; Content item = _inputQ.poll(); while (item != null) { @@ -91,9 +148,40 @@ public class HttpInput extends ServletInputStream implements Runnable _contentConsumed = 0; _firstByteTimeStamp = -1; _blockUntil = 0; + _interceptor = null; } } + /** + * @return The current Interceptor, or null if none set + */ + public Interceptor getInterceptor() + { + return _interceptor; + } + + /** + * Set the interceptor. + * @param interceptor The interceptor to use. + */ + public void setInterceptor(Interceptor interceptor) + { + _interceptor = interceptor; + } + + /** + * Set the {@link Interceptor}, using a {@link ChainedInterceptor} if + * an {@link Interceptor} is already set. + * @param interceptor the next {@link Interceptor} in a chain + */ + public void addInterceptor(Interceptor interceptor) + { + if (_interceptor == null) + _interceptor = interceptor; + else + _interceptor = new ChainedInterceptor(_interceptor,interceptor); + } + @Override public int available() { @@ -101,8 +189,9 @@ public class HttpInput extends ServletInputStream implements Runnable boolean woken = false; synchronized (_inputQ) { - Content content = _inputQ.peek(); - if (content == null) + if (_content == null) + _content = _inputQ.poll(); + if (_content == null) { try { @@ -112,11 +201,12 @@ public class HttpInput extends ServletInputStream implements Runnable { woken = failed(e); } - content = _inputQ.peek(); + if (_content == null) + _content = _inputQ.poll(); } - if (content != null) - available = remaining(content); + if (_content != null) + available = _content.remaining(); } if (woken) @@ -139,10 +229,10 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public int read() throws IOException { - int read = read(_oneByteBuffer, 0, 1); + int read = read(_oneByteBuffer,0,1); if (read == 0) throw new IllegalStateException("unready read=0"); - return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; + return read < 0?-1:_oneByteBuffer[0] & 0xFF; } @Override @@ -168,7 +258,7 @@ public class HttpInput extends ServletInputStream implements Runnable { long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1); if (_contentArrived < minimum_data) - throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, String.format("Request data rate < %d B/s", minRequestDataRate)); + throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate)); } } @@ -177,11 +267,12 @@ public class HttpInput extends ServletInputStream implements Runnable Content item = nextContent(); if (item != null) { - int l = get(item, b, off, len); + int l = get(item,b,off,len); if (LOG.isDebugEnabled()) - LOG.debug("{} read {} from {}", this, l, item); + LOG.debug("{} read {} from {}",this,l,item); - consumeNonContent(); + // Consume any following poison pills + pollReadableContent(); return l; } @@ -193,191 +284,207 @@ public class HttpInput extends ServletInputStream implements Runnable } /** - * Called when derived implementations should attempt to - * produce more Content and add it via {@link #addContent(Content)}. - * For protocols that are constantly producing (eg HTTP2) this can - * be left as a noop; + * Called when derived implementations should attempt to produce more Content and add it via {@link #addContent(Content)}. For protocols that are constantly + * producing (eg HTTP2) this can be left as a noop; * - * @throws IOException if unable to produce content + * @throws IOException + * if unable to produce content */ protected void produceContent() throws IOException { } /** - * Get the next content from the inputQ, calling {@link #produceContent()} - * if need be. EOF is processed and state changed. + * Get the next content from the inputQ, calling {@link #produceContent()} if need be. EOF is processed and state changed. * * @return the content or null if none available. - * @throws IOException if retrieving the content fails + * @throws IOException + * if retrieving the content fails */ protected Content nextContent() throws IOException { - Content content = pollContent(); + Content content = pollNonEmptyContent(); if (content == null && !isFinished()) { produceContent(); - content = pollContent(); + content = pollNonEmptyContent(); } return content; } /** - * Poll the inputQ for Content. - * Consumed buffers and {@link PoisonPillContent}s are removed and - * EOF state updated if need be. + * Poll the inputQ for Content. Consumed buffers and {@link SentinelContent}s are removed and EOF state updated if need be. * * @return Content or null */ - protected Content pollContent() + protected Content pollNonEmptyContent() { - // Items are removed only when they are fully consumed. - Content content = _inputQ.peek(); - // Skip consumed items at the head of the queue. - while (content != null && remaining(content) == 0) - { - _inputQ.poll(); - content.succeeded(); - if (LOG.isDebugEnabled()) - LOG.debug("{} consumed {}", this, content); - - if (content == EOF_CONTENT) + while (true) + { + // Get the next content (or EOF) + Content content = pollReadableContent(); + + // If it is EOF, consume it here + if (content instanceof SentinelContent) { - if (_listener == null) - _state = EOF; - else + if (content == EARLY_EOF_CONTENT) + _state = EARLY_EOF; + else if (content instanceof EofContent) { - _state = AEOF; - boolean woken = _channelState.onReadReady(); // force callback? - if (woken) - wake(); + if (_listener == null) + _state = EOF; + else + { + _state = AEOF; + boolean woken = _channelState.onReadReady(); // force callback? + if (woken) + wake(); + } } + + // Consume the EOF content, either if it was original content + // or if it was produced by interception + content.succeeded(); + if (_content==content) + _content = null; + else if (_intercepted==content) + _intercepted = null; + continue; } - else if (content == EARLY_EOF_CONTENT) - _state = EARLY_EOF; - content = _inputQ.peek(); + return content; } - - return content; + } /** + * Poll the inputQ for Content or EOF. Consumed buffers and non EOF {@link SentinelContent}s are removed. EOF state is not updated. + * Interception is done within this method. + * @return Content with remaining, a {@link SentinelContent}, or null */ - protected void consumeNonContent() + protected Content pollReadableContent() { - // Items are removed only when they are fully consumed. - Content content = _inputQ.peek(); - // Skip consumed items at the head of the queue. - while (content != null && remaining(content) == 0) + // If we have a chunk produced by interception + if (_intercepted!=null) { - // Defer EOF until read - if (content instanceof EofContent) - break; - - // Consume all other empty content - _inputQ.poll(); - content.succeeded(); - if (LOG.isDebugEnabled()) - LOG.debug("{} consumed {}", this, content); - content = _inputQ.peek(); + // Use it if it has any remaining content + if (_intercepted.hasContent()) + return _intercepted; + + // succeed the chunk + _intercepted.succeeded(); + _intercepted=null; } + + // If we don't have a Content under consideration, get + // the next one off the input Q. + if (_content == null) + _content = _inputQ.poll(); + + // While we have content to consider. + while (_content!=null) + { + // Are we intercepting? + if (_interceptor!=null) + { + // Intercept the current content (may be called several + // times for the same content + _intercepted = _interceptor.readFrom(_content); + + // If interception produced new content + if (_intercepted!=null && _intercepted!=_content) + { + // if it is not empty use it + if (_intercepted.hasContent()) + return _intercepted; + _intercepted.succeeded(); + } + + // intercepted content consumed + _intercepted=null; + + // fall through so that the unintercepted _content is + // considered for any remaining content, for EOF and to + // succeed it if it is entirely consumed. + } + + // If the content has content or is an EOF marker, use it + if (_content.hasContent() || _content instanceof SentinelContent) + return _content; + + // The content is consumed, so get the next one. Note that EOF + // content is never consumed here, but in #pollContent + _content.succeeded(); + _content = _inputQ.poll(); + } + + return null; + } + /** - * Get the next readable from the inputQ, calling {@link #produceContent()} - * if need be. EOF is NOT processed and state is not changed. + * Get the next readable from the inputQ, calling {@link #produceContent()} if need be. EOF is NOT processed and state is not changed. * * @return the content or EOF or null if none available. - * @throws IOException if retrieving the content fails + * @throws IOException + * if retrieving the content fails */ protected Content nextReadable() throws IOException { - Content content = pollReadable(); + Content content = pollReadableContent(); if (content == null && !isFinished()) { produceContent(); - content = pollReadable(); + content = pollReadableContent(); } return content; } - /** - * Poll the inputQ for Content or EOF. - * Consumed buffers and non EOF {@link PoisonPillContent}s are removed. - * EOF state is not updated. - * - * @return Content, EOF or null - */ - protected Content pollReadable() - { - // Items are removed only when they are fully consumed. - Content content = _inputQ.peek(); - - // Skip consumed items at the head of the queue except EOF - while (content != null) - { - if (content == EOF_CONTENT || content == EARLY_EOF_CONTENT || remaining(content) > 0) - return content; - - _inputQ.poll(); - content.succeeded(); - if (LOG.isDebugEnabled()) - LOG.debug("{} consumed {}", this, content); - content = _inputQ.peek(); - } - - return null; - } - - /** - * @param item the content - * @return how many bytes remain in the given content - */ - protected int remaining(Content item) - { - return item.remaining(); - } /** * Copies the given content into the given byte buffer. * - * @param content the content to copy from - * @param buffer the buffer to copy into - * @param offset the buffer offset to start copying from - * @param length the space available in the buffer + * @param content + * the content to copy from + * @param buffer + * the buffer to copy into + * @param offset + * the buffer offset to start copying from + * @param length + * the space available in the buffer * @return the number of bytes actually copied */ protected int get(Content content, byte[] buffer, int offset, int length) { - int l = Math.min(content.remaining(), length); - content.getContent().get(buffer, offset, l); + int l = content.get(buffer,offset,length); _contentConsumed += l; return l; } /** - * Consumes the given content. - * Calls the content succeeded if all content consumed. + * Consumes the given content. Calls the content succeeded if all content consumed. * - * @param content the content to consume - * @param length the number of bytes to consume + * @param content + * the content to consume + * @param length + * the number of bytes to consume */ protected void skip(Content content, int length) { - int l = Math.min(content.remaining(), length); - ByteBuffer buffer = content.getContent(); - buffer.position(buffer.position() + l); + int l = content.skip(length); + _contentConsumed += l; - if (l > 0 && !content.hasContent()) - pollContent(); // hungry succeed + if (l > 0 && content.isEmpty()) + pollNonEmptyContent(); // hungry succeed } /** * Blocks until some content or some end-of-file event arrives. * - * @throws IOException if the wait is interrupted + * @throws IOException + * if the wait is interrupted */ protected void blockForContent() throws IOException { @@ -392,7 +499,7 @@ public class HttpInput extends ServletInputStream implements Runnable } if (LOG.isDebugEnabled()) - LOG.debug("{} blocking for content timeout={}", this, timeout); + LOG.debug("{} blocking for content timeout={}",this,timeout); if (timeout > 0) _inputQ.wait(timeout); else @@ -402,7 +509,7 @@ public class HttpInput extends ServletInputStream implements Runnable // TODO: so spurious wakeups are not handled correctly. if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0) - throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout())); + throw new TimeoutException(String.format("Blocking timeout %d ms",getBlockingTimeout())); } catch (Throwable e) { @@ -412,11 +519,12 @@ public class HttpInput extends ServletInputStream implements Runnable /** * Adds some content to the start of this input stream. - *

Typically used to push back content that has - * been read, perhaps mutated. The bytes prepended are - * deducted for the contentConsumed total

+ *

+ * Typically used to push back content that has been read, perhaps mutated. The bytes prepended are deducted for the contentConsumed total + *

* - * @param item the content to add + * @param item + * the content to add * @return true if content channel woken for read */ public boolean prependContent(Content item) @@ -424,44 +532,54 @@ public class HttpInput extends ServletInputStream implements Runnable boolean woken = false; synchronized (_inputQ) { - _inputQ.push(item); + if (_content != null) + _inputQ.push(_content); + _content = item; _contentConsumed -= item.remaining(); if (LOG.isDebugEnabled()) - LOG.debug("{} prependContent {}", this, item); + LOG.debug("{} prependContent {}",this,item); if (_listener == null) _inputQ.notify(); else woken = _channelState.onReadPossible(); } - return woken; } /** * Adds some content to this input stream. * - * @param item the content to add + * @param content + * the content to add * @return true if content channel woken for read */ - public boolean addContent(Content item) + public boolean addContent(Content content) { boolean woken = false; synchronized (_inputQ) { if (_firstByteTimeStamp == -1) _firstByteTimeStamp = System.nanoTime(); - _contentArrived += item.remaining(); - _inputQ.offer(item); - if (LOG.isDebugEnabled()) - LOG.debug("{} addContent {}", this, item); - if (_listener == null) - _inputQ.notify(); + _contentArrived += content.remaining(); + + if (_content==null && _inputQ.isEmpty()) + _content=content; else - woken = _channelState.onReadPossible(); - } + _inputQ.offer(content); + + if (LOG.isDebugEnabled()) + LOG.debug("{} addContent {}",this,content); + if (pollReadableContent()!=null) + { + if (_listener == null) + _inputQ.notify(); + else + woken = _channelState.onReadPossible(); + } + } return woken; } @@ -469,7 +587,7 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { - return _inputQ.size() > 0; + return _content!=null || _inputQ.size() > 0; } } @@ -490,11 +608,9 @@ public class HttpInput extends ServletInputStream implements Runnable } /** - * This method should be called to signal that an EOF has been - * detected before all the expected content arrived. + * This method should be called to signal that an EOF has been detected before all the expected content arrived. *

- * Typically this will result in an EOFException being thrown - * from a subsequent read rather than a -1 return. + * Typically this will result in an EOFException being thrown from a subsequent read rather than a -1 return. * * @return true if content channel woken for read */ @@ -504,8 +620,7 @@ public class HttpInput extends ServletInputStream implements Runnable } /** - * This method should be called to signal that all the expected - * content arrived. + * This method should be called to signal that all the expected content arrived. * * @return true if content channel woken for read */ @@ -526,7 +641,7 @@ public class HttpInput extends ServletInputStream implements Runnable if (item == null) break; // Let's not bother blocking - skip(item, remaining(item)); + skip(item,item.remaining()); } return isFinished() && !isError(); } @@ -641,11 +756,8 @@ public class HttpInput extends ServletInputStream implements Runnable } /* - *

- * While this class is-a Runnable, it should never be dispatched in it's own thread. It is a - * runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)} - * to setup classloaders etc. - *

+ *

While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link + * ContextHandler#handle(Runnable)} to setup classloaders etc.

*/ @Override public void run() @@ -666,7 +778,7 @@ public class HttpInput extends ServletInputStream implements Runnable } listener = _listener; - error = _state instanceof ErrorState ? ((ErrorState)_state).getError() : null; + error = _state instanceof ErrorState?((ErrorState)_state).getError():null; } try @@ -721,19 +833,24 @@ public class HttpInput extends ServletInputStream implements Runnable content = _inputQ.peekFirst(); } return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]", - getClass().getSimpleName(), - hashCode(), - consumed, - q, - content, - state); + getClass().getSimpleName(), + hashCode(), + consumed, + q, + content, + state); } - public static class PoisonPillContent extends Content + /** + * A Sentinel Content, which has zero length content but + * indicates some other event in the input stream (eg EOF) + * + */ + public static class SentinelContent extends Content { private final String _name; - public PoisonPillContent(String name) + public SentinelContent(String name) { super(BufferUtil.EMPTY_BUFFER); _name = name; @@ -746,7 +863,7 @@ public class HttpInput extends ServletInputStream implements Runnable } } - public static class EofContent extends PoisonPillContent + public static class EofContent extends SentinelContent { EofContent(String name) { @@ -756,22 +873,36 @@ public class HttpInput extends ServletInputStream implements Runnable public static class Content implements Callback { - private final ByteBuffer _content; + protected final ByteBuffer _content; public Content(ByteBuffer content) { _content = content; } + public ByteBuffer getByteBuffer() + { + return _content; + } + @Override public InvocationType getInvocationType() { return InvocationType.NON_BLOCKING; } - public ByteBuffer getContent() + public int get(byte[] buffer, int offset, int length) { - return _content; + length = Math.min(_content.remaining(),length); + _content.get(buffer,offset,length); + return length; + } + + public int skip(int length) + { + length = Math.min(_content.remaining(),length); + _content.position(_content.position() + length); + return length; } public boolean hasContent() @@ -783,15 +914,19 @@ public class HttpInput extends ServletInputStream implements Runnable { return _content.remaining(); } + + public boolean isEmpty() + { + return !_content.hasRemaining(); + } @Override public String toString() { - return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content)); + return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content)); } } - protected static abstract class State { public boolean blockForContent(HttpInput in) throws IOException diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index c9a1354c73f..9d1ec89924b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -175,9 +175,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable return _interceptor; } - public void setInterceptor(Interceptor filter) + public void setInterceptor(Interceptor interceptor) { - _interceptor = filter; + _interceptor = interceptor; } public boolean isWritten() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java index 1206f1a6c06..3d5ff8df02d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java @@ -65,6 +65,7 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory private int _compressionLevel=Deflater.DEFAULT_COMPRESSION; private boolean _checkGzExists = true; private boolean _syncFlush = false; + private int _inflateBufferSize = -1; // non-static, as other GzipHandler instances may have different configurations private final ThreadLocal _deflater = new ThreadLocal<>(); @@ -77,6 +78,7 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory private HttpField _vary; + /* ------------------------------------------------------------ */ /** * Instantiates a new gzip handler. @@ -398,6 +400,24 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory return _vary; } + /* ------------------------------------------------------------ */ + /** + * @return size in bytes of the buffer to inflate compressed request, or 0 for no inflation. + */ + public int getInflateBufferSize() + { + return _inflateBufferSize; + } + + /* ------------------------------------------------------------ */ + /** + * @param size size in bytes of the buffer to inflate compressed request, or 0 for no inflation. + */ + public void setInflateBufferSize(int size) + { + _inflateBufferSize = size; + } + /* ------------------------------------------------------------ */ /** * @see org.eclipse.jetty.server.handler.HandlerWrapper#handle(java.lang.String, org.eclipse.jetty.server.Request, javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse) @@ -409,6 +429,19 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory String path = context==null?baseRequest.getRequestURI():URIUtil.addPaths(baseRequest.getServletPath(),baseRequest.getPathInfo()); LOG.debug("{} handle {} in {}",this,baseRequest,context); + // Handle request inflation + if (_inflateBufferSize>0) + { + HttpField ce = baseRequest.getHttpFields().getField(HttpHeader.CONTENT_ENCODING); + if (ce!=null && "gzip".equalsIgnoreCase(ce.getValue())) + { + // TODO should check ce.contains and then remove just the gzip encoding + baseRequest.getHttpFields().remove(HttpHeader.CONTENT_ENCODING); + baseRequest.getHttpFields().add(new HttpField("X-Content-Encoding",ce.getValue())); + baseRequest.getHttpInput().addInterceptor(new GzipHttpInputInterceptor(baseRequest.getHttpChannel().getByteBufferPool(),_inflateBufferSize)); + } + } + HttpOutput out = baseRequest.getResponse().getHttpOutput(); // Are we already being gzipped? HttpOutput.Interceptor interceptor = out.getInterceptor(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpInputInterceptor.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpInputInterceptor.java new file mode 100644 index 00000000000..97198e736f6 --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpInputInterceptor.java @@ -0,0 +1,83 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server.handler.gzip; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.http.GZIPContentDecoder; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.server.HttpInput; +import org.eclipse.jetty.server.HttpInput.Content; + +/** + * A HttpInput Interceptor that inflates GZIP encoded request content. + * + */ +public class GzipHttpInputInterceptor implements HttpInput.Interceptor +{ + class Decoder extends GZIPContentDecoder + { + public Decoder(ByteBufferPool pool, int bufferSize) + { + super(pool,bufferSize); + } + + @Override + protected boolean decodedChunk(final ByteBuffer chunk) + { + _chunk = chunk; + return true; + } + + @Override + public void decodeChunks(ByteBuffer compressed) + { + _chunk = null; + super.decodeChunks(compressed); + } + } + + private final Decoder _decoder; + private ByteBuffer _chunk; + + public GzipHttpInputInterceptor(ByteBufferPool pool, int bufferSize) + { + _decoder = new Decoder(pool,bufferSize); + } + + @Override + public Content readFrom(Content content) + { + _decoder.decodeChunks(content.getByteBuffer()); + final ByteBuffer chunk = _chunk; + + if (chunk==null) + return null; + + return new Content(chunk) + { + @Override + public void succeeded() + { + _decoder.release(chunk); + } + }; + } + +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java index 985d8c01bed..820e7ebf0c2 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java @@ -354,6 +354,7 @@ public class AsyncRequestReadTest for (int i=read;i-->0;) { int c=in.read(); + // System.err.println("in="+c); if (c<0) break; out.write(c); diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/GzipHandlerTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/GzipHandlerTest.java index e892f389e5a..7b1ef878a5a 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/GzipHandlerTest.java +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/GzipHandlerTest.java @@ -33,8 +33,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -45,6 +47,7 @@ import org.eclipse.jetty.http.HttpTester; import org.eclipse.jetty.server.LocalConnector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; import org.hamcrest.Matchers; import org.junit.After; @@ -86,6 +89,7 @@ public class GzipHandlerTest GzipHandler gzipHandler = new GzipHandler(); gzipHandler.setExcludedAgentPatterns(); gzipHandler.setMinGzipSize(16); + gzipHandler.setInflateBufferSize(4096); ServletContextHandler context = new ServletContextHandler(gzipHandler,"/ctx"); ServletHandler servlets = context.getServletHandler(); @@ -97,6 +101,7 @@ public class GzipHandlerTest servlets.addServletWithMapping(TestServlet.class,"/content"); servlets.addServletWithMapping(ForwardServlet.class,"/forward"); servlets.addServletWithMapping(IncludeServlet.class,"/include"); + servlets.addServletWithMapping(EchoServlet.class,"/echo/*"); _server.start(); } @@ -147,6 +152,21 @@ public class GzipHandlerTest } } } + + public static class EchoServlet extends HttpServlet + { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException + { + response.setContentType(req.getContentType()); + IO.copy(req.getInputStream(),response.getOutputStream()); + } + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException + { + doGet(req,response); + } + } public static class ForwardServlet extends HttpServlet { @@ -392,4 +412,68 @@ public class GzipHandlerTest assertThat("Included Paths.size", includedPaths.length, is(2)); assertThat("Included Paths", Arrays.asList(includedPaths), contains("/foo","^/bar.*$")); } + + + @Test + public void testGzipRequest() throws Exception + { + String data = "Hello Nice World! "; + for (int i = 0; i < 10; ++i) + data += data; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data.getBytes(StandardCharsets.UTF_8)); + output.close(); + byte[] bytes = baos.toByteArray(); + + // generated and parsed test + HttpTester.Request request = HttpTester.newRequest(); + HttpTester.Response response; + + request.setMethod("POST"); + request.setURI("/ctx/echo"); + request.setVersion("HTTP/1.0"); + request.setHeader("Host","tester"); + request.setHeader("Content-Type","text/plain"); + request.setHeader("Content-Encoding","gzip"); + request.setContent(bytes); + + response = HttpTester.parseResponse(_connector.getResponse(request.generate())); + + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is(data)); + + } + + @Test + public void testGzipBomb() throws Exception + { + byte[] data = new byte[512*1024]; + Arrays.fill(data,(byte)'X'); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + output.write(data); + output.close(); + byte[] bytes = baos.toByteArray(); + + // generated and parsed test + HttpTester.Request request = HttpTester.newRequest(); + HttpTester.Response response; + + request.setMethod("POST"); + request.setURI("/ctx/echo"); + request.setVersion("HTTP/1.0"); + request.setHeader("Host","tester"); + request.setHeader("Content-Type","text/plain"); + request.setHeader("Content-Encoding","gzip"); + request.setContent(bytes); + + response = HttpTester.parseResponse(_connector.getResponse(request.generate())); + // TODO need to test back pressure works + + assertThat(response.getStatus(),is(200)); + assertThat(response.getContentBytes().length,is(512*1024)); + } + } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java index 61f38956422..1110eac8f64 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.http.client; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.io.UncheckedIOException; @@ -58,9 +59,12 @@ import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpInput; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.HttpInput.Content; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.log.StacklessLogging; import org.hamcrest.Matchers; @@ -68,6 +72,8 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Test; +import static java.nio.ByteBuffer.wrap; +import static org.eclipse.jetty.util.BufferUtil.toArray; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -1038,4 +1044,174 @@ public class AsyncIOServletTest extends AbstractTest assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); } + + + @Test + public void testAsyncIntercepted() throws Exception + { + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + System.err.println("Service "+request); + + final HttpInput httpInput = ((Request)request).getHttpInput(); + httpInput.addInterceptor(new HttpInput.Interceptor() + { + int state = 0; + Content saved; + + @Override + public Content readFrom(Content content) + { + // System.err.printf("readFrom s=%d saved=%b %s%n",state,saved!=null,content); + switch(state) + { + case 0: + // null transform + if (content.isEmpty()) + state++; + return null; + + case 1: + { + // copy transform + if (content.isEmpty()) + { + state++; + return content; + } + ByteBuffer copy = wrap(toArray(content.getByteBuffer())); + content.skip(copy.remaining()); + return new Content(copy); + } + + case 2: + // byte by byte + if (content.isEmpty()) + { + state++; + return content; + } + byte[] b = new byte[1]; + int l = content.get(b,0,1); + return new Content(wrap(b,0,l)); + + case 3: + { + // double vision + if (content.isEmpty()) + { + if (saved==null) + { + state++; + return content; + } + Content copy = saved; + saved=null; + return copy; + } + + byte[] data = toArray(content.getByteBuffer()); + content.skip(data.length); + saved = new Content(wrap(data)); + return new Content(wrap(data)); + } + + default: + return null; + } + } + }); + + AsyncContext asyncContext = request.startAsync(); + ServletInputStream input = request.getInputStream(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + while (input.isReady() && !input.isFinished()) + { + int b = input.read(); + if (b>0) + { + // System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b)); + out.write(b); + } + else + onAllDataRead(); + } + } + + @Override + public void onAllDataRead() throws IOException + { + response.getOutputStream().write(out.toByteArray()); + asyncContext.complete(); + } + + @Override + public void onError(Throwable x) + { + } + }); + } + }); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch clientLatch = new CountDownLatch(1); + + String expected = + "S0" + + "S1" + + "S2" + + "S3S3" + + "S4" + + "S5" + + "S6"; + + client.newRequest(newURI()) + .method(HttpMethod.POST) + .path(servletPath) + .content(contentProvider) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded()) + { + Response response = result.getResponse(); + assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); + assertThat(getContentAsString(), Matchers.equalTo(expected)); + clientLatch.countDown(); + } + } + }); + + contentProvider.offer(BufferUtil.toBuffer("S0")); + contentProvider.flush(); + contentProvider.offer(BufferUtil.toBuffer("S1")); + contentProvider.flush(); + contentProvider.offer(BufferUtil.toBuffer("S2")); + contentProvider.flush(); + contentProvider.offer(BufferUtil.toBuffer("S3")); + contentProvider.flush(); + contentProvider.offer(BufferUtil.toBuffer("S4")); + contentProvider.flush(); + contentProvider.offer(BufferUtil.toBuffer("S5")); + contentProvider.flush(); + contentProvider.offer(BufferUtil.toBuffer("S6")); + contentProvider.close(); + + + Assert.assertTrue(clientLatch.await(10,TimeUnit.SECONDS)); + + + } + } From 4410610ef3b0b5cfbc9990a8a9a49d518e17883e Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 5 Oct 2016 13:50:12 +1100 Subject: [PATCH 2/2] javadoc --- .../java/org/eclipse/jetty/server/HttpInput.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index c090daab2d0..1b5e42d626e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -59,11 +59,23 @@ public class HttpInput extends ServletInputStream implements Runnable *

* Unlike inputstream wrappers that can be applied by filters, an interceptor * is entirely transparent and works with async IO APIs. + *

+ * An Interceptor may consume data from the passed content and the interceptor + * will continue to be called for the same content until the interceptor returns + * null or an empty content. Thus even if the passed content is completely consumed + * the interceptor will be called with the same content until it can no longer + * produce more content. * @see HttpInput#setInterceptor(Interceptor) * @see HttpInput#addInterceptor(Interceptor) */ public interface Interceptor { + /** + * @param content The content to be intercepted (may be empty or a {@link SentinelContent}. + * The content will be modified with any data the interceptor consumes, but there is no requirement + * that all the data is consumed by the interceptor. + * @return The intercepted content or null if interception is completed for that content. + */ Content readFrom(Content content); }