Jetty 9.4.x http interceptor #382

* Issue #382 Request compression

Added identity HttpInput.Interceptor
Moved GZIPContentDecoder to jetty-http
Reworking interceptor and GZIPContentDecoder to avoid data copies
Completed and tested GZIPContentDecoder
Implemented GzipHttpInputInterceptor
updated GzipHandler.java
updated gzip module
use common GZIP decoder
Gzip Bomb
handle read() after empty interception
This commit is contained in:
Greg Wilkins 2016-10-05 13:49:20 +11:00 committed by GitHub
parent 3a34e4d21b
commit 98bb582d45
14 changed files with 1459 additions and 473 deletions

View File

@ -18,26 +18,13 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import java.util.zip.ZipException;
import org.eclipse.jetty.util.BufferUtil;
/**
* {@link ContentDecoder} for the "gzip" encoding.
*
*/
public class GZIPContentDecoder implements ContentDecoder
public class GZIPContentDecoder extends org.eclipse.jetty.http.GZIPContentDecoder implements ContentDecoder
{
private final Inflater inflater = new Inflater(true);
private final byte[] bytes;
private byte[] output;
private State state;
private int size;
private int value;
private byte flags;
public GZIPContentDecoder()
{
@ -46,285 +33,7 @@ public class GZIPContentDecoder implements ContentDecoder
public GZIPContentDecoder(int bufferSize)
{
this.bytes = new byte[bufferSize];
reset();
}
/**
* {@inheritDoc}
* <p>If the decoding did not produce any output, for example because it consumed gzip header
* or trailer bytes, it returns a buffer with zero capacity.</p>
* <p>This method never returns null.</p>
* <p>The given {@code buffer}'s position will be modified to reflect the bytes consumed during
* the decoding.</p>
* <p>The decoding may be finished without consuming the buffer completely if the buffer contains
* gzip bytes plus other bytes (either plain or gzipped).</p>
*/
@Override
public ByteBuffer decode(ByteBuffer buffer)
{
try
{
while (buffer.hasRemaining())
{
byte currByte = buffer.get();
switch (state)
{
case INITIAL:
{
buffer.position(buffer.position() - 1);
state = State.ID;
break;
}
case ID:
{
value += (currByte & 0xFF) << 8 * size;
++size;
if (size == 2)
{
if (value != 0x8B1F)
throw new ZipException("Invalid gzip bytes");
state = State.CM;
}
break;
}
case CM:
{
if ((currByte & 0xFF) != 0x08)
throw new ZipException("Invalid gzip compression method");
state = State.FLG;
break;
}
case FLG:
{
flags = currByte;
state = State.MTIME;
size = 0;
value = 0;
break;
}
case MTIME:
{
// Skip the 4 MTIME bytes
++size;
if (size == 4)
state = State.XFL;
break;
}
case XFL:
{
// Skip XFL
state = State.OS;
break;
}
case OS:
{
// Skip OS
state = State.FLAGS;
break;
}
case FLAGS:
{
buffer.position(buffer.position() - 1);
if ((flags & 0x04) == 0x04)
{
state = State.EXTRA_LENGTH;
size = 0;
value = 0;
}
else if ((flags & 0x08) == 0x08)
state = State.NAME;
else if ((flags & 0x10) == 0x10)
state = State.COMMENT;
else if ((flags & 0x2) == 0x2)
{
state = State.HCRC;
size = 0;
value = 0;
}
else
state = State.DATA;
break;
}
case EXTRA_LENGTH:
{
value += (currByte & 0xFF) << 8 * size;
++size;
if (size == 2)
state = State.EXTRA;
break;
}
case EXTRA:
{
// Skip EXTRA bytes
--value;
if (value == 0)
{
// Clear the EXTRA flag and loop on the flags
flags &= ~0x04;
state = State.FLAGS;
}
break;
}
case NAME:
{
// Skip NAME bytes
if (currByte == 0)
{
// Clear the NAME flag and loop on the flags
flags &= ~0x08;
state = State.FLAGS;
}
break;
}
case COMMENT:
{
// Skip COMMENT bytes
if (currByte == 0)
{
// Clear the COMMENT flag and loop on the flags
flags &= ~0x10;
state = State.FLAGS;
}
break;
}
case HCRC:
{
// Skip HCRC
++size;
if (size == 2)
{
// Clear the HCRC flag and loop on the flags
flags &= ~0x02;
state = State.FLAGS;
}
break;
}
case DATA:
{
buffer.position(buffer.position() - 1);
while (true)
{
int decoded = inflate(bytes);
if (decoded == 0)
{
if (inflater.needsInput())
{
if (buffer.hasRemaining())
{
byte[] input = new byte[buffer.remaining()];
buffer.get(input);
inflater.setInput(input);
}
else
{
if (output != null)
{
ByteBuffer result = ByteBuffer.wrap(output);
output = null;
return result;
}
break;
}
}
else if (inflater.finished())
{
int remaining = inflater.getRemaining();
buffer.position(buffer.limit() - remaining);
state = State.CRC;
size = 0;
value = 0;
break;
}
else
{
throw new ZipException("Invalid inflater state");
}
}
else
{
if (output == null)
{
// Save the inflated bytes and loop to see if we have finished
output = Arrays.copyOf(bytes, decoded);
}
else
{
// Accumulate inflated bytes and loop to see if we have finished
byte[] newOutput = Arrays.copyOf(output, output.length + decoded);
System.arraycopy(bytes, 0, newOutput, output.length, decoded);
output = newOutput;
}
}
}
break;
}
case CRC:
{
value += (currByte & 0xFF) << 8 * size;
++size;
if (size == 4)
{
// From RFC 1952, compliant decoders need not to verify the CRC
state = State.ISIZE;
size = 0;
value = 0;
}
break;
}
case ISIZE:
{
value += (currByte & 0xFF) << 8 * size;
++size;
if (size == 4)
{
if (value != inflater.getBytesWritten())
throw new ZipException("Invalid input size");
ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output);
reset();
return result;
}
break;
}
default:
throw new ZipException();
}
}
return BufferUtil.EMPTY_BUFFER;
}
catch (ZipException x)
{
throw new RuntimeException(x);
}
}
private int inflate(byte[] bytes) throws ZipException
{
try
{
return inflater.inflate(bytes);
}
catch (DataFormatException x)
{
throw new ZipException(x.getMessage());
}
}
private void reset()
{
inflater.reset();
Arrays.fill(bytes, (byte)0);
output = null;
state = State.INITIAL;
size = 0;
value = 0;
flags = 0;
}
protected boolean isFinished()
{
return state == State.INITIAL;
super(null,bufferSize);
}
/**
@ -351,9 +60,4 @@ public class GZIPContentDecoder implements ContentDecoder
return new GZIPContentDecoder(bufferSize);
}
}
private enum State
{
INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE
}
}

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.toolchain.test.TestTracker;
import org.junit.Rule;
import org.junit.Test;
@Deprecated
public class GZIPContentDecoderTest
{
@Rule

View File

@ -18,6 +18,11 @@
<artifactId>jetty-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>

View File

@ -0,0 +1,416 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http;
import java.nio.ByteBuffer;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import java.util.zip.ZipException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
/**
* Decoder for the "gzip" encoding.
* <p>An Decode that inflates gzip compressed data that has been
* optimized for async usage with minimal data copies.
*
*/
public class GZIPContentDecoder
{
private final Inflater _inflater = new Inflater(true);
private final ByteBufferPool _pool;
private final int _bufferSize;
private State _state;
private int _size;
private int _value;
private byte _flags;
private ByteBuffer _inflated;
public GZIPContentDecoder()
{
this(null,2048);
}
public GZIPContentDecoder(int bufferSize)
{
this(null,bufferSize);
}
public GZIPContentDecoder(ByteBufferPool pool, int bufferSize)
{
_bufferSize = bufferSize;
_pool = pool;
reset();
}
/** Inflate compressed data from a buffer.
*
* @param compressed Buffer containing compressed data.
* @return Buffer containing inflated data.
*/
public ByteBuffer decode(ByteBuffer compressed)
{
decodeChunks(compressed);
if (BufferUtil.isEmpty(_inflated) || _state==State.CRC || _state==State.ISIZE )
return BufferUtil.EMPTY_BUFFER;
ByteBuffer result = _inflated;
_inflated = null;
return result;
}
/** Called when a chunk of data is inflated.
* <p>The default implementation aggregates all the chunks
* into a single buffer returned from {@link #decode(ByteBuffer)}.
* Derived implementations may choose to consume chunks individually
* and return false to prevent further inflation until a subsequent
* call to {@link #decode(ByteBuffer)} or {@link #decodeChunks(ByteBuffer)}.
*
* @param chunk The inflated chunk of data
* @return False if inflating should continue, or True if the call
* to {@link #decodeChunks(ByteBuffer)} or {@link #decode(ByteBuffer)}
* should return, allowing back pressure of compressed data.
*/
protected boolean decodedChunk(ByteBuffer chunk)
{
if (_inflated==null)
_inflated=chunk;
else
{
int size = _inflated.remaining() + chunk.remaining();
if (size<=_inflated.capacity())
{
BufferUtil.append(_inflated,chunk);
BufferUtil.put(chunk,_inflated);
release(chunk);
}
else
{
ByteBuffer bigger=_pool==null?BufferUtil.allocate(size):_pool.acquire(size,false);
int pos=BufferUtil.flipToFill(bigger);
BufferUtil.put(_inflated,bigger);
BufferUtil.put(chunk,bigger);
BufferUtil.flipToFlush(bigger,pos);
release(_inflated);
release(chunk);
_inflated = bigger;
}
}
return false;
}
/**
* Inflate compressed data.
* <p>Inflation continues until the compressed block end is reached, there is no
* more compressed data or a call to {@link #decodedChunk(ByteBuffer)} returns true.
* @param compressed Buffer of compressed data to inflate
*/
protected void decodeChunks(ByteBuffer compressed)
{
ByteBuffer buffer = null;
try
{
while (true)
{
switch (_state)
{
case INITIAL:
{
_state = State.ID;
break;
}
case FLAGS:
{
if ((_flags & 0x04) == 0x04)
{
_state = State.EXTRA_LENGTH;
_size = 0;
_value = 0;
}
else if ((_flags & 0x08) == 0x08)
_state = State.NAME;
else if ((_flags & 0x10) == 0x10)
_state = State.COMMENT;
else if ((_flags & 0x2) == 0x2)
{
_state = State.HCRC;
_size = 0;
_value = 0;
}
else
{
_state = State.DATA;
continue;
}
break;
}
case DATA:
{
while (true)
{
if (buffer==null)
buffer = acquire();
try
{
int length = _inflater.inflate(buffer.array(),buffer.arrayOffset(),buffer.capacity());
buffer.limit(length);
}
catch (DataFormatException x)
{
throw new ZipException(x.getMessage());
}
if (buffer.hasRemaining())
{
ByteBuffer chunk = buffer;
buffer = null;
if (decodedChunk(chunk))
return;
}
else if (_inflater.needsInput())
{
if (!compressed.hasRemaining())
return;
if (compressed.hasArray())
{
_inflater.setInput(compressed.array(),compressed.arrayOffset()+compressed.position(),compressed.remaining());
compressed.position(compressed.limit());
}
else
{
// TODO use the pool
byte[] input = new byte[compressed.remaining()];
compressed.get(input);
_inflater.setInput(input);
}
}
else if (_inflater.finished())
{
int remaining = _inflater.getRemaining();
compressed.position(compressed.limit() - remaining);
_state = State.CRC;
_size = 0;
_value = 0;
break;
}
}
continue;
}
default:
break;
}
if (!compressed.hasRemaining())
break;
byte currByte = compressed.get();
switch (_state)
{
case ID:
{
_value += (currByte & 0xFF) << 8 * _size;
++_size;
if (_size == 2)
{
if (_value != 0x8B1F)
throw new ZipException("Invalid gzip bytes");
_state = State.CM;
}
break;
}
case CM:
{
if ((currByte & 0xFF) != 0x08)
throw new ZipException("Invalid gzip compression method");
_state = State.FLG;
break;
}
case FLG:
{
_flags = currByte;
_state = State.MTIME;
_size = 0;
_value = 0;
break;
}
case MTIME:
{
// Skip the 4 MTIME bytes
++_size;
if (_size == 4)
_state = State.XFL;
break;
}
case XFL:
{
// Skip XFL
_state = State.OS;
break;
}
case OS:
{
// Skip OS
_state = State.FLAGS;
break;
}
case EXTRA_LENGTH:
{
_value += (currByte & 0xFF) << 8 * _size;
++_size;
if (_size == 2)
_state = State.EXTRA;
break;
}
case EXTRA:
{
// Skip EXTRA bytes
--_value;
if (_value == 0)
{
// Clear the EXTRA flag and loop on the flags
_flags &= ~0x04;
_state = State.FLAGS;
}
break;
}
case NAME:
{
// Skip NAME bytes
if (currByte == 0)
{
// Clear the NAME flag and loop on the flags
_flags &= ~0x08;
_state = State.FLAGS;
}
break;
}
case COMMENT:
{
// Skip COMMENT bytes
if (currByte == 0)
{
// Clear the COMMENT flag and loop on the flags
_flags &= ~0x10;
_state = State.FLAGS;
}
break;
}
case HCRC:
{
// Skip HCRC
++_size;
if (_size == 2)
{
// Clear the HCRC flag and loop on the flags
_flags &= ~0x02;
_state = State.FLAGS;
}
break;
}
case CRC:
{
_value += (currByte & 0xFF) << 8 * _size;
++_size;
if (_size == 4)
{
// From RFC 1952, compliant decoders need not to verify the CRC
_state = State.ISIZE;
_size = 0;
_value = 0;
}
break;
}
case ISIZE:
{
_value += (currByte & 0xFF) << 8 * _size;
++_size;
if (_size == 4)
{
if (_value != _inflater.getBytesWritten())
throw new ZipException("Invalid input size");
// TODO ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output);
reset();
return ;
}
break;
}
default:
throw new ZipException();
}
}
}
catch (ZipException x)
{
throw new RuntimeException(x);
}
finally
{
if (buffer!=null)
release(buffer);
}
}
private void reset()
{
_inflater.reset();
_state = State.INITIAL;
_size = 0;
_value = 0;
_flags = 0;
}
public boolean isFinished()
{
return _state == State.INITIAL;
}
private enum State
{
INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE
}
/**
* @return An indirect buffer of the configured buffersize either from the pool or freshly allocated.
*/
public ByteBuffer acquire()
{
return _pool==null?BufferUtil.allocate(_bufferSize):_pool.acquire(_bufferSize,false);
}
/**
* Release an allocated buffer.
* <p>This method will called {@link ByteBufferPool#release(ByteBuffer)} if a buffer pool has
* been configured. This method should be called once for all buffers returned from {@link #decode(ByteBuffer)}
* or passed to {@link #decodedChunk(ByteBuffer)}.
* @param buffer The buffer to release.
*/
public void release(ByteBuffer buffer)
{
if (_pool!=null && buffer!=BufferUtil.EMPTY_BUFFER)
_pool.release(buffer);
}
}

View File

@ -0,0 +1,343 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class GZIPContentDecoderTest
{
@Rule
public final TestTracker tracker = new TestTracker();
ArrayByteBufferPool pool;
AtomicInteger buffers = new AtomicInteger(0);
@Before
public void beforeClass() throws Exception
{
buffers.set(0);
pool = new ArrayByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
buffers.incrementAndGet();
return super.acquire(size,direct);
}
@Override
public void release(ByteBuffer buffer)
{
buffers.decrementAndGet();
super.release(buffer);
}
};
}
@After
public void afterClass() throws Exception
{
assertEquals(0,buffers.get());
}
@Test
public void testStreamNoBlocks() throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.close();
byte[] bytes = baos.toByteArray();
GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1);
int read = input.read();
assertEquals(-1, read);
}
@Test
public void testStreamBigBlockOneByteAtATime() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
baos = new ByteArrayOutputStream();
GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1);
int read;
while ((read = input.read()) >= 0)
baos.write(read);
assertEquals(data, new String(baos.toByteArray(), StandardCharsets.UTF_8));
}
@Test
public void testNoBlocks() throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.close();
byte[] bytes = baos.toByteArray();
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
assertEquals(0, decoded.remaining());
}
@Test
public void testSmallBlock() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
decoder.release(decoded);
}
@Test
public void testSmallBlockWithGZIPChunkedAtBegin() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The header is 10 bytes, chunk at 11 bytes
byte[] bytes1 = new byte[11];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(0, decoded.capacity());
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
decoder.release(decoded);
}
@Test
public void testSmallBlockWithGZIPChunkedAtEnd() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The trailer is 8 bytes, chunk the last 9 bytes
byte[] bytes1 = new byte[bytes.length - 9];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
assertFalse(decoder.isFinished());
decoder.release(decoded);
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(0, decoded.remaining());
assertTrue(decoder.isFinished());
decoder.release(decoded);
}
@Test
public void testSmallBlockWithGZIPTrailerChunked() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The trailer is 4+4 bytes, chunk the last 3 bytes
byte[] bytes1 = new byte[bytes.length - 3];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(0, decoded.capacity());
decoder.release(decoded);
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
decoder.release(decoded);
}
@Test
public void testTwoSmallBlocks() throws Exception
{
String data1 = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data1.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes1 = baos.toByteArray();
String data2 = "1";
baos = new ByteArrayOutputStream();
output = new GZIPOutputStream(baos);
output.write(data2.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes2 = baos.toByteArray();
byte[] bytes = new byte[bytes1.length + bytes2.length];
System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ByteBuffer decoded = decoder.decode(buffer);
assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString());
assertTrue(decoder.isFinished());
assertTrue(buffer.hasRemaining());
decoder.release(decoded);
decoded = decoder.decode(buffer);
assertEquals(data2, StandardCharsets.UTF_8.decode(decoded).toString());
assertTrue(decoder.isFinished());
assertFalse(buffer.hasRemaining());
decoder.release(decoded);
}
@Test
public void testBigBlock() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(buffer);
result += StandardCharsets.UTF_8.decode(decoded).toString();
decoder.release(decoded);
}
assertEquals(data, result);
}
@Test
public void testBigBlockOneByteAtATime() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(64);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(new byte[]{buffer.get()}));
if (decoded.hasRemaining())
result += StandardCharsets.UTF_8.decode(decoded).toString();
decoder.release(decoded);
}
assertEquals(data, result);
assertTrue(decoder.isFinished());
}
@Test
public void testBigBlockWithExtraBytes() throws Exception
{
String data1 = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data1 += data1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data1.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes1 = baos.toByteArray();
String data2 = "HELLO";
byte[] bytes2 = data2.getBytes(StandardCharsets.UTF_8);
byte[] bytes = new byte[bytes1.length + bytes2.length];
System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(64);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(buffer);
if (decoded.hasRemaining())
result += StandardCharsets.UTF_8.decode(decoded).toString();
decoder.release(decoded);
if (decoder.isFinished())
break;
}
assertEquals(data1, result);
assertTrue(buffer.hasRemaining());
assertEquals(data2, StandardCharsets.UTF_8.decode(buffer).toString());
}
}

View File

@ -15,6 +15,8 @@
<Set name="minGzipSize"><Property name="jetty.gzip.minGzipSize" deprecated="gzip.minGzipSize" default="2048"/></Set>
<Set name="checkGzExists"><Property name="jetty.gzip.checkGzExists" deprecated="gzip.checkGzExists" default="false"/></Set>
<Set name="compressionLevel"><Property name="jetty.gzip.compressionLevel" deprecated="gzip.compressionLevel" default="-1"/></Set>
<Set name="inflateBufferSize"><Property name="jetty.gzip.inflateBufferSize" default="0/></Set>
<Set name="excludedAgentPatterns">
<Array type="String">
<Item><Property name="jetty.gzip.excludedUserAgent" deprecated="gzip.excludedUserAgent" default=".*MSIE.6\.0.*"/></Item>

View File

@ -20,3 +20,6 @@ etc/jetty-gzip.xml
## User agents for which gzip is disabled
# jetty.gzip.excludedUserAgent=.*MSIE.6\.0.*
## Inflate request buffer size, or 0 for no request inflation
# jetty.gzip.inflateBufferSize=0

View File

@ -25,6 +25,7 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -39,23 +40,75 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
* <p>
* Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
* maintains two states: the content state that tells whether there is content to consume and the EOF
* state that tells whether an EOF has arrived.
* Only once the content has been consumed the content state is moved to the EOF state.
* Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class maintains two states: the content state that tells
* whether there is content to consume and the EOF state that tells whether an EOF has arrived. Only once the content has been consumed the content state is
* moved to the EOF state.
*/
/**
* @author gregw
*
*/
public class HttpInput extends ServletInputStream implements Runnable
{
/**
* An interceptor for HTTP Request input.
* <p>
* Unlike inputstream wrappers that can be applied by filters, an interceptor
* is entirely transparent and works with async IO APIs.
* @see HttpInput#setInterceptor(Interceptor)
* @see HttpInput#addInterceptor(Interceptor)
*/
public interface Interceptor
{
Content readFrom(Content content);
}
/**
* An {@link Interceptor} that chains two other {@link Interceptor}s together.
* The {@link #readFrom(Content)} calls the previous {@link Interceptor}'s
* {@link #readFrom(Content)} and then passes any {@link Content} returned
* to the next {@link Interceptor}.
*/
public static class ChainedInterceptor implements Interceptor
{
private final Interceptor _prev;
private final Interceptor _next;
public ChainedInterceptor(Interceptor prev, Interceptor next)
{
_prev = prev;
_next = next;
}
public Interceptor getPrev()
{
return _prev;
}
public Interceptor getNext()
{
return _next;
}
@Override
public Content readFrom(Content content)
{
return _next.readFrom(_prev.readFrom(content));
}
}
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final static Content EOF_CONTENT = new EofContent("EOF");
private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
private final byte[] _oneByteBuffer = new byte[1];
private Content _content;
private Content _intercepted;
private final Deque<Content> _inputQ = new ArrayDeque<>();
private final HttpChannelState _channelState;
private ReadListener _listener;
@ -64,6 +117,7 @@ public class HttpInput extends ServletInputStream implements Runnable
private long _contentArrived;
private long _contentConsumed;
private long _blockUntil;
private Interceptor _interceptor;
public HttpInput(HttpChannelState state)
{
@ -79,6 +133,9 @@ public class HttpInput extends ServletInputStream implements Runnable
{
synchronized (_inputQ)
{
if (_content!=null)
_content.failed(null);
_content = null;
Content item = _inputQ.poll();
while (item != null)
{
@ -91,9 +148,40 @@ public class HttpInput extends ServletInputStream implements Runnable
_contentConsumed = 0;
_firstByteTimeStamp = -1;
_blockUntil = 0;
_interceptor = null;
}
}
/**
* @return The current Interceptor, or null if none set
*/
public Interceptor getInterceptor()
{
return _interceptor;
}
/**
* Set the interceptor.
* @param interceptor The interceptor to use.
*/
public void setInterceptor(Interceptor interceptor)
{
_interceptor = interceptor;
}
/**
* Set the {@link Interceptor}, using a {@link ChainedInterceptor} if
* an {@link Interceptor} is already set.
* @param interceptor the next {@link Interceptor} in a chain
*/
public void addInterceptor(Interceptor interceptor)
{
if (_interceptor == null)
_interceptor = interceptor;
else
_interceptor = new ChainedInterceptor(_interceptor,interceptor);
}
@Override
public int available()
{
@ -101,8 +189,9 @@ public class HttpInput extends ServletInputStream implements Runnable
boolean woken = false;
synchronized (_inputQ)
{
Content content = _inputQ.peek();
if (content == null)
if (_content == null)
_content = _inputQ.poll();
if (_content == null)
{
try
{
@ -112,11 +201,12 @@ public class HttpInput extends ServletInputStream implements Runnable
{
woken = failed(e);
}
content = _inputQ.peek();
if (_content == null)
_content = _inputQ.poll();
}
if (content != null)
available = remaining(content);
if (_content != null)
available = _content.remaining();
}
if (woken)
@ -139,10 +229,10 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
int read = read(_oneByteBuffer,0,1);
if (read == 0)
throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
return read < 0?-1:_oneByteBuffer[0] & 0xFF;
}
@Override
@ -168,7 +258,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentArrived < minimum_data)
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, String.format("Request data rate < %d B/s", minRequestDataRate));
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate));
}
}
@ -177,11 +267,12 @@ public class HttpInput extends ServletInputStream implements Runnable
Content item = nextContent();
if (item != null)
{
int l = get(item, b, off, len);
int l = get(item,b,off,len);
if (LOG.isDebugEnabled())
LOG.debug("{} read {} from {}", this, l, item);
LOG.debug("{} read {} from {}",this,l,item);
consumeNonContent();
// Consume any following poison pills
pollReadableContent();
return l;
}
@ -193,191 +284,207 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
* Called when derived implementations should attempt to
* produce more Content and add it via {@link #addContent(Content)}.
* For protocols that are constantly producing (eg HTTP2) this can
* be left as a noop;
* Called when derived implementations should attempt to produce more Content and add it via {@link #addContent(Content)}. For protocols that are constantly
* producing (eg HTTP2) this can be left as a noop;
*
* @throws IOException if unable to produce content
* @throws IOException
* if unable to produce content
*/
protected void produceContent() throws IOException
{
}
/**
* Get the next content from the inputQ, calling {@link #produceContent()}
* if need be. EOF is processed and state changed.
* Get the next content from the inputQ, calling {@link #produceContent()} if need be. EOF is processed and state changed.
*
* @return the content or null if none available.
* @throws IOException if retrieving the content fails
* @throws IOException
* if retrieving the content fails
*/
protected Content nextContent() throws IOException
{
Content content = pollContent();
Content content = pollNonEmptyContent();
if (content == null && !isFinished())
{
produceContent();
content = pollContent();
content = pollNonEmptyContent();
}
return content;
}
/**
* Poll the inputQ for Content.
* Consumed buffers and {@link PoisonPillContent}s are removed and
* EOF state updated if need be.
* Poll the inputQ for Content. Consumed buffers and {@link SentinelContent}s are removed and EOF state updated if need be.
*
* @return Content or null
*/
protected Content pollContent()
protected Content pollNonEmptyContent()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue.
while (content != null && remaining(content) == 0)
{
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
if (content == EOF_CONTENT)
while (true)
{
// Get the next content (or EOF)
Content content = pollReadableContent();
// If it is EOF, consume it here
if (content instanceof SentinelContent)
{
if (_listener == null)
_state = EOF;
else
if (content == EARLY_EOF_CONTENT)
_state = EARLY_EOF;
else if (content instanceof EofContent)
{
_state = AEOF;
boolean woken = _channelState.onReadReady(); // force callback?
if (woken)
wake();
if (_listener == null)
_state = EOF;
else
{
_state = AEOF;
boolean woken = _channelState.onReadReady(); // force callback?
if (woken)
wake();
}
}
// Consume the EOF content, either if it was original content
// or if it was produced by interception
content.succeeded();
if (_content==content)
_content = null;
else if (_intercepted==content)
_intercepted = null;
continue;
}
else if (content == EARLY_EOF_CONTENT)
_state = EARLY_EOF;
content = _inputQ.peek();
return content;
}
return content;
}
/**
* Poll the inputQ for Content or EOF. Consumed buffers and non EOF {@link SentinelContent}s are removed. EOF state is not updated.
* Interception is done within this method.
* @return Content with remaining, a {@link SentinelContent}, or null
*/
protected void consumeNonContent()
protected Content pollReadableContent()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue.
while (content != null && remaining(content) == 0)
// If we have a chunk produced by interception
if (_intercepted!=null)
{
// Defer EOF until read
if (content instanceof EofContent)
break;
// Consume all other empty content
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
content = _inputQ.peek();
// Use it if it has any remaining content
if (_intercepted.hasContent())
return _intercepted;
// succeed the chunk
_intercepted.succeeded();
_intercepted=null;
}
// If we don't have a Content under consideration, get
// the next one off the input Q.
if (_content == null)
_content = _inputQ.poll();
// While we have content to consider.
while (_content!=null)
{
// Are we intercepting?
if (_interceptor!=null)
{
// Intercept the current content (may be called several
// times for the same content
_intercepted = _interceptor.readFrom(_content);
// If interception produced new content
if (_intercepted!=null && _intercepted!=_content)
{
// if it is not empty use it
if (_intercepted.hasContent())
return _intercepted;
_intercepted.succeeded();
}
// intercepted content consumed
_intercepted=null;
// fall through so that the unintercepted _content is
// considered for any remaining content, for EOF and to
// succeed it if it is entirely consumed.
}
// If the content has content or is an EOF marker, use it
if (_content.hasContent() || _content instanceof SentinelContent)
return _content;
// The content is consumed, so get the next one. Note that EOF
// content is never consumed here, but in #pollContent
_content.succeeded();
_content = _inputQ.poll();
}
return null;
}
/**
* Get the next readable from the inputQ, calling {@link #produceContent()}
* if need be. EOF is NOT processed and state is not changed.
* Get the next readable from the inputQ, calling {@link #produceContent()} if need be. EOF is NOT processed and state is not changed.
*
* @return the content or EOF or null if none available.
* @throws IOException if retrieving the content fails
* @throws IOException
* if retrieving the content fails
*/
protected Content nextReadable() throws IOException
{
Content content = pollReadable();
Content content = pollReadableContent();
if (content == null && !isFinished())
{
produceContent();
content = pollReadable();
content = pollReadableContent();
}
return content;
}
/**
* Poll the inputQ for Content or EOF.
* Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
* EOF state is not updated.
*
* @return Content, EOF or null
*/
protected Content pollReadable()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue except EOF
while (content != null)
{
if (content == EOF_CONTENT || content == EARLY_EOF_CONTENT || remaining(content) > 0)
return content;
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
content = _inputQ.peek();
}
return null;
}
/**
* @param item the content
* @return how many bytes remain in the given content
*/
protected int remaining(Content item)
{
return item.remaining();
}
/**
* Copies the given content into the given byte buffer.
*
* @param content the content to copy from
* @param buffer the buffer to copy into
* @param offset the buffer offset to start copying from
* @param length the space available in the buffer
* @param content
* the content to copy from
* @param buffer
* the buffer to copy into
* @param offset
* the buffer offset to start copying from
* @param length
* the space available in the buffer
* @return the number of bytes actually copied
*/
protected int get(Content content, byte[] buffer, int offset, int length)
{
int l = Math.min(content.remaining(), length);
content.getContent().get(buffer, offset, l);
int l = content.get(buffer,offset,length);
_contentConsumed += l;
return l;
}
/**
* Consumes the given content.
* Calls the content succeeded if all content consumed.
* Consumes the given content. Calls the content succeeded if all content consumed.
*
* @param content the content to consume
* @param length the number of bytes to consume
* @param content
* the content to consume
* @param length
* the number of bytes to consume
*/
protected void skip(Content content, int length)
{
int l = Math.min(content.remaining(), length);
ByteBuffer buffer = content.getContent();
buffer.position(buffer.position() + l);
int l = content.skip(length);
_contentConsumed += l;
if (l > 0 && !content.hasContent())
pollContent(); // hungry succeed
if (l > 0 && content.isEmpty())
pollNonEmptyContent(); // hungry succeed
}
/**
* Blocks until some content or some end-of-file event arrives.
*
* @throws IOException if the wait is interrupted
* @throws IOException
* if the wait is interrupted
*/
protected void blockForContent() throws IOException
{
@ -392,7 +499,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content timeout={}", this, timeout);
LOG.debug("{} blocking for content timeout={}",this,timeout);
if (timeout > 0)
_inputQ.wait(timeout);
else
@ -402,7 +509,7 @@ public class HttpInput extends ServletInputStream implements Runnable
// TODO: so spurious wakeups are not handled correctly.
if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0)
throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout()));
throw new TimeoutException(String.format("Blocking timeout %d ms",getBlockingTimeout()));
}
catch (Throwable e)
{
@ -412,11 +519,12 @@ public class HttpInput extends ServletInputStream implements Runnable
/**
* Adds some content to the start of this input stream.
* <p>Typically used to push back content that has
* been read, perhaps mutated. The bytes prepended are
* deducted for the contentConsumed total</p>
* <p>
* Typically used to push back content that has been read, perhaps mutated. The bytes prepended are deducted for the contentConsumed total
* </p>
*
* @param item the content to add
* @param item
* the content to add
* @return true if content channel woken for read
*/
public boolean prependContent(Content item)
@ -424,44 +532,54 @@ public class HttpInput extends ServletInputStream implements Runnable
boolean woken = false;
synchronized (_inputQ)
{
_inputQ.push(item);
if (_content != null)
_inputQ.push(_content);
_content = item;
_contentConsumed -= item.remaining();
if (LOG.isDebugEnabled())
LOG.debug("{} prependContent {}", this, item);
LOG.debug("{} prependContent {}",this,item);
if (_listener == null)
_inputQ.notify();
else
woken = _channelState.onReadPossible();
}
return woken;
}
/**
* Adds some content to this input stream.
*
* @param item the content to add
* @param content
* the content to add
* @return true if content channel woken for read
*/
public boolean addContent(Content item)
public boolean addContent(Content content)
{
boolean woken = false;
synchronized (_inputQ)
{
if (_firstByteTimeStamp == -1)
_firstByteTimeStamp = System.nanoTime();
_contentArrived += item.remaining();
_inputQ.offer(item);
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, item);
if (_listener == null)
_inputQ.notify();
_contentArrived += content.remaining();
if (_content==null && _inputQ.isEmpty())
_content=content;
else
woken = _channelState.onReadPossible();
}
_inputQ.offer(content);
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}",this,content);
if (pollReadableContent()!=null)
{
if (_listener == null)
_inputQ.notify();
else
woken = _channelState.onReadPossible();
}
}
return woken;
}
@ -469,7 +587,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
synchronized (_inputQ)
{
return _inputQ.size() > 0;
return _content!=null || _inputQ.size() > 0;
}
}
@ -490,11 +608,9 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
* This method should be called to signal that an EOF has been
* detected before all the expected content arrived.
* This method should be called to signal that an EOF has been detected before all the expected content arrived.
* <p>
* Typically this will result in an EOFException being thrown
* from a subsequent read rather than a -1 return.
* Typically this will result in an EOFException being thrown from a subsequent read rather than a -1 return.
*
* @return true if content channel woken for read
*/
@ -504,8 +620,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
* This method should be called to signal that all the expected
* content arrived.
* This method should be called to signal that all the expected content arrived.
*
* @return true if content channel woken for read
*/
@ -526,7 +641,7 @@ public class HttpInput extends ServletInputStream implements Runnable
if (item == null)
break; // Let's not bother blocking
skip(item, remaining(item));
skip(item,item.remaining());
}
return isFinished() && !isError();
}
@ -641,11 +756,8 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/*
* <p>
* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
* runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)}
* to setup classloaders etc.
* </p>
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
@Override
public void run()
@ -666,7 +778,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
listener = _listener;
error = _state instanceof ErrorState ? ((ErrorState)_state).getError() : null;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
}
try
@ -721,19 +833,24 @@ public class HttpInput extends ServletInputStream implements Runnable
content = _inputQ.peekFirst();
}
return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]",
getClass().getSimpleName(),
hashCode(),
consumed,
q,
content,
state);
getClass().getSimpleName(),
hashCode(),
consumed,
q,
content,
state);
}
public static class PoisonPillContent extends Content
/**
* A Sentinel Content, which has zero length content but
* indicates some other event in the input stream (eg EOF)
*
*/
public static class SentinelContent extends Content
{
private final String _name;
public PoisonPillContent(String name)
public SentinelContent(String name)
{
super(BufferUtil.EMPTY_BUFFER);
_name = name;
@ -746,7 +863,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
}
public static class EofContent extends PoisonPillContent
public static class EofContent extends SentinelContent
{
EofContent(String name)
{
@ -756,22 +873,36 @@ public class HttpInput extends ServletInputStream implements Runnable
public static class Content implements Callback
{
private final ByteBuffer _content;
protected final ByteBuffer _content;
public Content(ByteBuffer content)
{
_content = content;
}
public ByteBuffer getByteBuffer()
{
return _content;
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
public ByteBuffer getContent()
public int get(byte[] buffer, int offset, int length)
{
return _content;
length = Math.min(_content.remaining(),length);
_content.get(buffer,offset,length);
return length;
}
public int skip(int length)
{
length = Math.min(_content.remaining(),length);
_content.position(_content.position() + length);
return length;
}
public boolean hasContent()
@ -783,15 +914,19 @@ public class HttpInput extends ServletInputStream implements Runnable
{
return _content.remaining();
}
public boolean isEmpty()
{
return !_content.hasRemaining();
}
@Override
public String toString()
{
return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content));
return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
}
}
protected static abstract class State
{
public boolean blockForContent(HttpInput in) throws IOException

View File

@ -175,9 +175,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _interceptor;
}
public void setInterceptor(Interceptor filter)
public void setInterceptor(Interceptor interceptor)
{
_interceptor = filter;
_interceptor = interceptor;
}
public boolean isWritten()

View File

@ -65,6 +65,7 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
private int _compressionLevel=Deflater.DEFAULT_COMPRESSION;
private boolean _checkGzExists = true;
private boolean _syncFlush = false;
private int _inflateBufferSize = -1;
// non-static, as other GzipHandler instances may have different configurations
private final ThreadLocal<Deflater> _deflater = new ThreadLocal<>();
@ -77,6 +78,7 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
private HttpField _vary;
/* ------------------------------------------------------------ */
/**
* Instantiates a new gzip handler.
@ -398,6 +400,24 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
return _vary;
}
/* ------------------------------------------------------------ */
/**
* @return size in bytes of the buffer to inflate compressed request, or 0 for no inflation.
*/
public int getInflateBufferSize()
{
return _inflateBufferSize;
}
/* ------------------------------------------------------------ */
/**
* @param size size in bytes of the buffer to inflate compressed request, or 0 for no inflation.
*/
public void setInflateBufferSize(int size)
{
_inflateBufferSize = size;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.server.handler.HandlerWrapper#handle(java.lang.String, org.eclipse.jetty.server.Request, javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
@ -409,6 +429,19 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
String path = context==null?baseRequest.getRequestURI():URIUtil.addPaths(baseRequest.getServletPath(),baseRequest.getPathInfo());
LOG.debug("{} handle {} in {}",this,baseRequest,context);
// Handle request inflation
if (_inflateBufferSize>0)
{
HttpField ce = baseRequest.getHttpFields().getField(HttpHeader.CONTENT_ENCODING);
if (ce!=null && "gzip".equalsIgnoreCase(ce.getValue()))
{
// TODO should check ce.contains and then remove just the gzip encoding
baseRequest.getHttpFields().remove(HttpHeader.CONTENT_ENCODING);
baseRequest.getHttpFields().add(new HttpField("X-Content-Encoding",ce.getValue()));
baseRequest.getHttpInput().addInterceptor(new GzipHttpInputInterceptor(baseRequest.getHttpChannel().getByteBufferPool(),_inflateBufferSize));
}
}
HttpOutput out = baseRequest.getResponse().getHttpOutput();
// Are we already being gzipped?
HttpOutput.Interceptor interceptor = out.getInterceptor();

View File

@ -0,0 +1,83 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.handler.gzip;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.GZIPContentDecoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpInput.Content;
/**
* A HttpInput Interceptor that inflates GZIP encoded request content.
*
*/
public class GzipHttpInputInterceptor implements HttpInput.Interceptor
{
class Decoder extends GZIPContentDecoder
{
public Decoder(ByteBufferPool pool, int bufferSize)
{
super(pool,bufferSize);
}
@Override
protected boolean decodedChunk(final ByteBuffer chunk)
{
_chunk = chunk;
return true;
}
@Override
public void decodeChunks(ByteBuffer compressed)
{
_chunk = null;
super.decodeChunks(compressed);
}
}
private final Decoder _decoder;
private ByteBuffer _chunk;
public GzipHttpInputInterceptor(ByteBufferPool pool, int bufferSize)
{
_decoder = new Decoder(pool,bufferSize);
}
@Override
public Content readFrom(Content content)
{
_decoder.decodeChunks(content.getByteBuffer());
final ByteBuffer chunk = _chunk;
if (chunk==null)
return null;
return new Content(chunk)
{
@Override
public void succeeded()
{
_decoder.release(chunk);
}
};
}
}

View File

@ -354,6 +354,7 @@ public class AsyncRequestReadTest
for (int i=read;i-->0;)
{
int c=in.read();
// System.err.println("in="+c);
if (c<0)
break;
out.write(c);

View File

@ -33,8 +33,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@ -45,6 +47,7 @@ import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.hamcrest.Matchers;
import org.junit.After;
@ -86,6 +89,7 @@ public class GzipHandlerTest
GzipHandler gzipHandler = new GzipHandler();
gzipHandler.setExcludedAgentPatterns();
gzipHandler.setMinGzipSize(16);
gzipHandler.setInflateBufferSize(4096);
ServletContextHandler context = new ServletContextHandler(gzipHandler,"/ctx");
ServletHandler servlets = context.getServletHandler();
@ -97,6 +101,7 @@ public class GzipHandlerTest
servlets.addServletWithMapping(TestServlet.class,"/content");
servlets.addServletWithMapping(ForwardServlet.class,"/forward");
servlets.addServletWithMapping(IncludeServlet.class,"/include");
servlets.addServletWithMapping(EchoServlet.class,"/echo/*");
_server.start();
}
@ -147,6 +152,21 @@ public class GzipHandlerTest
}
}
}
public static class EchoServlet extends HttpServlet
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException
{
response.setContentType(req.getContentType());
IO.copy(req.getInputStream(),response.getOutputStream());
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException
{
doGet(req,response);
}
}
public static class ForwardServlet extends HttpServlet
{
@ -392,4 +412,68 @@ public class GzipHandlerTest
assertThat("Included Paths.size", includedPaths.length, is(2));
assertThat("Included Paths", Arrays.asList(includedPaths), contains("/foo","^/bar.*$"));
}
@Test
public void testGzipRequest() throws Exception
{
String data = "Hello Nice World! ";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// generated and parsed test
HttpTester.Request request = HttpTester.newRequest();
HttpTester.Response response;
request.setMethod("POST");
request.setURI("/ctx/echo");
request.setVersion("HTTP/1.0");
request.setHeader("Host","tester");
request.setHeader("Content-Type","text/plain");
request.setHeader("Content-Encoding","gzip");
request.setContent(bytes);
response = HttpTester.parseResponse(_connector.getResponse(request.generate()));
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is(data));
}
@Test
public void testGzipBomb() throws Exception
{
byte[] data = new byte[512*1024];
Arrays.fill(data,(byte)'X');
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data);
output.close();
byte[] bytes = baos.toByteArray();
// generated and parsed test
HttpTester.Request request = HttpTester.newRequest();
HttpTester.Response response;
request.setMethod("POST");
request.setURI("/ctx/echo");
request.setVersion("HTTP/1.0");
request.setHeader("Host","tester");
request.setHeader("Content-Type","text/plain");
request.setHeader("Content-Encoding","gzip");
request.setContent(bytes);
response = HttpTester.parseResponse(_connector.getResponse(request.generate()));
// TODO need to test back pressure works
assertThat(response.getStatus(),is(200));
assertThat(response.getContentBytes().length,is(512*1024));
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
@ -58,9 +59,12 @@ import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
@ -68,6 +72,8 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import static java.nio.ByteBuffer.wrap;
import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@ -1038,4 +1044,174 @@ public class AsyncIOServletTest extends AbstractTest
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testAsyncIntercepted() throws Exception
{
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
System.err.println("Service "+request);
final HttpInput httpInput = ((Request)request).getHttpInput();
httpInput.addInterceptor(new HttpInput.Interceptor()
{
int state = 0;
Content saved;
@Override
public Content readFrom(Content content)
{
// System.err.printf("readFrom s=%d saved=%b %s%n",state,saved!=null,content);
switch(state)
{
case 0:
// null transform
if (content.isEmpty())
state++;
return null;
case 1:
{
// copy transform
if (content.isEmpty())
{
state++;
return content;
}
ByteBuffer copy = wrap(toArray(content.getByteBuffer()));
content.skip(copy.remaining());
return new Content(copy);
}
case 2:
// byte by byte
if (content.isEmpty())
{
state++;
return content;
}
byte[] b = new byte[1];
int l = content.get(b,0,1);
return new Content(wrap(b,0,l));
case 3:
{
// double vision
if (content.isEmpty())
{
if (saved==null)
{
state++;
return content;
}
Content copy = saved;
saved=null;
return copy;
}
byte[] data = toArray(content.getByteBuffer());
content.skip(data.length);
saved = new Content(wrap(data));
return new Content(wrap(data));
}
default:
return null;
}
}
});
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
ByteArrayOutputStream out = new ByteArrayOutputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady() && !input.isFinished())
{
int b = input.read();
if (b>0)
{
// System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b));
out.write(b);
}
else
onAllDataRead();
}
}
@Override
public void onAllDataRead() throws IOException
{
response.getOutputStream().write(out.toByteArray());
asyncContext.complete();
}
@Override
public void onError(Throwable x)
{
}
});
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
"S0" +
"S1" +
"S2" +
"S3S3" +
"S4" +
"S5" +
"S6";
client.newRequest(newURI())
.method(HttpMethod.POST)
.path(servletPath)
.content(contentProvider)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
{
Response response = result.getResponse();
assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
assertThat(getContentAsString(), Matchers.equalTo(expected));
clientLatch.countDown();
}
}
});
contentProvider.offer(BufferUtil.toBuffer("S0"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S1"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S2"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S3"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S4"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S5"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S6"));
contentProvider.close();
Assert.assertTrue(clientLatch.await(10,TimeUnit.SECONDS));
}
}