Merge remote-tracking branch 'origin/jetty-9.4.x'

This commit is contained in:
Greg Wilkins 2016-10-05 13:51:03 +11:00
commit be1726e251
14 changed files with 1471 additions and 473 deletions

View File

@ -18,26 +18,13 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import java.util.zip.ZipException;
import org.eclipse.jetty.util.BufferUtil;
/**
* {@link ContentDecoder} for the "gzip" encoding.
*
*/
public class GZIPContentDecoder implements ContentDecoder
public class GZIPContentDecoder extends org.eclipse.jetty.http.GZIPContentDecoder implements ContentDecoder
{
private final Inflater inflater = new Inflater(true);
private final byte[] bytes;
private byte[] output;
private State state;
private int size;
private int value;
private byte flags;
public GZIPContentDecoder()
{
@ -46,285 +33,7 @@ public class GZIPContentDecoder implements ContentDecoder
public GZIPContentDecoder(int bufferSize)
{
this.bytes = new byte[bufferSize];
reset();
}
/**
* {@inheritDoc}
* <p>If the decoding did not produce any output, for example because it consumed gzip header
* or trailer bytes, it returns a buffer with zero capacity.</p>
* <p>This method never returns null.</p>
* <p>The given {@code buffer}'s position will be modified to reflect the bytes consumed during
* the decoding.</p>
* <p>The decoding may be finished without consuming the buffer completely if the buffer contains
* gzip bytes plus other bytes (either plain or gzipped).</p>
*/
@Override
public ByteBuffer decode(ByteBuffer buffer)
{
try
{
while (buffer.hasRemaining())
{
byte currByte = buffer.get();
switch (state)
{
case INITIAL:
{
buffer.position(buffer.position() - 1);
state = State.ID;
break;
}
case ID:
{
value += (currByte & 0xFF) << 8 * size;
++size;
if (size == 2)
{
if (value != 0x8B1F)
throw new ZipException("Invalid gzip bytes");
state = State.CM;
}
break;
}
case CM:
{
if ((currByte & 0xFF) != 0x08)
throw new ZipException("Invalid gzip compression method");
state = State.FLG;
break;
}
case FLG:
{
flags = currByte;
state = State.MTIME;
size = 0;
value = 0;
break;
}
case MTIME:
{
// Skip the 4 MTIME bytes
++size;
if (size == 4)
state = State.XFL;
break;
}
case XFL:
{
// Skip XFL
state = State.OS;
break;
}
case OS:
{
// Skip OS
state = State.FLAGS;
break;
}
case FLAGS:
{
buffer.position(buffer.position() - 1);
if ((flags & 0x04) == 0x04)
{
state = State.EXTRA_LENGTH;
size = 0;
value = 0;
}
else if ((flags & 0x08) == 0x08)
state = State.NAME;
else if ((flags & 0x10) == 0x10)
state = State.COMMENT;
else if ((flags & 0x2) == 0x2)
{
state = State.HCRC;
size = 0;
value = 0;
}
else
state = State.DATA;
break;
}
case EXTRA_LENGTH:
{
value += (currByte & 0xFF) << 8 * size;
++size;
if (size == 2)
state = State.EXTRA;
break;
}
case EXTRA:
{
// Skip EXTRA bytes
--value;
if (value == 0)
{
// Clear the EXTRA flag and loop on the flags
flags &= ~0x04;
state = State.FLAGS;
}
break;
}
case NAME:
{
// Skip NAME bytes
if (currByte == 0)
{
// Clear the NAME flag and loop on the flags
flags &= ~0x08;
state = State.FLAGS;
}
break;
}
case COMMENT:
{
// Skip COMMENT bytes
if (currByte == 0)
{
// Clear the COMMENT flag and loop on the flags
flags &= ~0x10;
state = State.FLAGS;
}
break;
}
case HCRC:
{
// Skip HCRC
++size;
if (size == 2)
{
// Clear the HCRC flag and loop on the flags
flags &= ~0x02;
state = State.FLAGS;
}
break;
}
case DATA:
{
buffer.position(buffer.position() - 1);
while (true)
{
int decoded = inflate(bytes);
if (decoded == 0)
{
if (inflater.needsInput())
{
if (buffer.hasRemaining())
{
byte[] input = new byte[buffer.remaining()];
buffer.get(input);
inflater.setInput(input);
}
else
{
if (output != null)
{
ByteBuffer result = ByteBuffer.wrap(output);
output = null;
return result;
}
break;
}
}
else if (inflater.finished())
{
int remaining = inflater.getRemaining();
buffer.position(buffer.limit() - remaining);
state = State.CRC;
size = 0;
value = 0;
break;
}
else
{
throw new ZipException("Invalid inflater state");
}
}
else
{
if (output == null)
{
// Save the inflated bytes and loop to see if we have finished
output = Arrays.copyOf(bytes, decoded);
}
else
{
// Accumulate inflated bytes and loop to see if we have finished
byte[] newOutput = Arrays.copyOf(output, output.length + decoded);
System.arraycopy(bytes, 0, newOutput, output.length, decoded);
output = newOutput;
}
}
}
break;
}
case CRC:
{
value += (currByte & 0xFF) << 8 * size;
++size;
if (size == 4)
{
// From RFC 1952, compliant decoders need not to verify the CRC
state = State.ISIZE;
size = 0;
value = 0;
}
break;
}
case ISIZE:
{
value += (currByte & 0xFF) << 8 * size;
++size;
if (size == 4)
{
if (value != inflater.getBytesWritten())
throw new ZipException("Invalid input size");
ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output);
reset();
return result;
}
break;
}
default:
throw new ZipException();
}
}
return BufferUtil.EMPTY_BUFFER;
}
catch (ZipException x)
{
throw new RuntimeException(x);
}
}
private int inflate(byte[] bytes) throws ZipException
{
try
{
return inflater.inflate(bytes);
}
catch (DataFormatException x)
{
throw new ZipException(x.getMessage());
}
}
private void reset()
{
inflater.reset();
Arrays.fill(bytes, (byte)0);
output = null;
state = State.INITIAL;
size = 0;
value = 0;
flags = 0;
}
protected boolean isFinished()
{
return state == State.INITIAL;
super(null,bufferSize);
}
/**
@ -351,9 +60,4 @@ public class GZIPContentDecoder implements ContentDecoder
return new GZIPContentDecoder(bufferSize);
}
}
private enum State
{
INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE
}
}

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.toolchain.test.TestTracker;
import org.junit.Rule;
import org.junit.Test;
@Deprecated
public class GZIPContentDecoderTest
{
@Rule

View File

@ -18,6 +18,11 @@
<artifactId>jetty-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>

View File

@ -0,0 +1,416 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http;
import java.nio.ByteBuffer;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import java.util.zip.ZipException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
/**
* Decoder for the "gzip" encoding.
* <p>An Decode that inflates gzip compressed data that has been
* optimized for async usage with minimal data copies.
*
*/
public class GZIPContentDecoder
{
private final Inflater _inflater = new Inflater(true);
private final ByteBufferPool _pool;
private final int _bufferSize;
private State _state;
private int _size;
private int _value;
private byte _flags;
private ByteBuffer _inflated;
public GZIPContentDecoder()
{
this(null,2048);
}
public GZIPContentDecoder(int bufferSize)
{
this(null,bufferSize);
}
public GZIPContentDecoder(ByteBufferPool pool, int bufferSize)
{
_bufferSize = bufferSize;
_pool = pool;
reset();
}
/** Inflate compressed data from a buffer.
*
* @param compressed Buffer containing compressed data.
* @return Buffer containing inflated data.
*/
public ByteBuffer decode(ByteBuffer compressed)
{
decodeChunks(compressed);
if (BufferUtil.isEmpty(_inflated) || _state==State.CRC || _state==State.ISIZE )
return BufferUtil.EMPTY_BUFFER;
ByteBuffer result = _inflated;
_inflated = null;
return result;
}
/** Called when a chunk of data is inflated.
* <p>The default implementation aggregates all the chunks
* into a single buffer returned from {@link #decode(ByteBuffer)}.
* Derived implementations may choose to consume chunks individually
* and return false to prevent further inflation until a subsequent
* call to {@link #decode(ByteBuffer)} or {@link #decodeChunks(ByteBuffer)}.
*
* @param chunk The inflated chunk of data
* @return False if inflating should continue, or True if the call
* to {@link #decodeChunks(ByteBuffer)} or {@link #decode(ByteBuffer)}
* should return, allowing back pressure of compressed data.
*/
protected boolean decodedChunk(ByteBuffer chunk)
{
if (_inflated==null)
_inflated=chunk;
else
{
int size = _inflated.remaining() + chunk.remaining();
if (size<=_inflated.capacity())
{
BufferUtil.append(_inflated,chunk);
BufferUtil.put(chunk,_inflated);
release(chunk);
}
else
{
ByteBuffer bigger=_pool==null?BufferUtil.allocate(size):_pool.acquire(size,false);
int pos=BufferUtil.flipToFill(bigger);
BufferUtil.put(_inflated,bigger);
BufferUtil.put(chunk,bigger);
BufferUtil.flipToFlush(bigger,pos);
release(_inflated);
release(chunk);
_inflated = bigger;
}
}
return false;
}
/**
* Inflate compressed data.
* <p>Inflation continues until the compressed block end is reached, there is no
* more compressed data or a call to {@link #decodedChunk(ByteBuffer)} returns true.
* @param compressed Buffer of compressed data to inflate
*/
protected void decodeChunks(ByteBuffer compressed)
{
ByteBuffer buffer = null;
try
{
while (true)
{
switch (_state)
{
case INITIAL:
{
_state = State.ID;
break;
}
case FLAGS:
{
if ((_flags & 0x04) == 0x04)
{
_state = State.EXTRA_LENGTH;
_size = 0;
_value = 0;
}
else if ((_flags & 0x08) == 0x08)
_state = State.NAME;
else if ((_flags & 0x10) == 0x10)
_state = State.COMMENT;
else if ((_flags & 0x2) == 0x2)
{
_state = State.HCRC;
_size = 0;
_value = 0;
}
else
{
_state = State.DATA;
continue;
}
break;
}
case DATA:
{
while (true)
{
if (buffer==null)
buffer = acquire();
try
{
int length = _inflater.inflate(buffer.array(),buffer.arrayOffset(),buffer.capacity());
buffer.limit(length);
}
catch (DataFormatException x)
{
throw new ZipException(x.getMessage());
}
if (buffer.hasRemaining())
{
ByteBuffer chunk = buffer;
buffer = null;
if (decodedChunk(chunk))
return;
}
else if (_inflater.needsInput())
{
if (!compressed.hasRemaining())
return;
if (compressed.hasArray())
{
_inflater.setInput(compressed.array(),compressed.arrayOffset()+compressed.position(),compressed.remaining());
compressed.position(compressed.limit());
}
else
{
// TODO use the pool
byte[] input = new byte[compressed.remaining()];
compressed.get(input);
_inflater.setInput(input);
}
}
else if (_inflater.finished())
{
int remaining = _inflater.getRemaining();
compressed.position(compressed.limit() - remaining);
_state = State.CRC;
_size = 0;
_value = 0;
break;
}
}
continue;
}
default:
break;
}
if (!compressed.hasRemaining())
break;
byte currByte = compressed.get();
switch (_state)
{
case ID:
{
_value += (currByte & 0xFF) << 8 * _size;
++_size;
if (_size == 2)
{
if (_value != 0x8B1F)
throw new ZipException("Invalid gzip bytes");
_state = State.CM;
}
break;
}
case CM:
{
if ((currByte & 0xFF) != 0x08)
throw new ZipException("Invalid gzip compression method");
_state = State.FLG;
break;
}
case FLG:
{
_flags = currByte;
_state = State.MTIME;
_size = 0;
_value = 0;
break;
}
case MTIME:
{
// Skip the 4 MTIME bytes
++_size;
if (_size == 4)
_state = State.XFL;
break;
}
case XFL:
{
// Skip XFL
_state = State.OS;
break;
}
case OS:
{
// Skip OS
_state = State.FLAGS;
break;
}
case EXTRA_LENGTH:
{
_value += (currByte & 0xFF) << 8 * _size;
++_size;
if (_size == 2)
_state = State.EXTRA;
break;
}
case EXTRA:
{
// Skip EXTRA bytes
--_value;
if (_value == 0)
{
// Clear the EXTRA flag and loop on the flags
_flags &= ~0x04;
_state = State.FLAGS;
}
break;
}
case NAME:
{
// Skip NAME bytes
if (currByte == 0)
{
// Clear the NAME flag and loop on the flags
_flags &= ~0x08;
_state = State.FLAGS;
}
break;
}
case COMMENT:
{
// Skip COMMENT bytes
if (currByte == 0)
{
// Clear the COMMENT flag and loop on the flags
_flags &= ~0x10;
_state = State.FLAGS;
}
break;
}
case HCRC:
{
// Skip HCRC
++_size;
if (_size == 2)
{
// Clear the HCRC flag and loop on the flags
_flags &= ~0x02;
_state = State.FLAGS;
}
break;
}
case CRC:
{
_value += (currByte & 0xFF) << 8 * _size;
++_size;
if (_size == 4)
{
// From RFC 1952, compliant decoders need not to verify the CRC
_state = State.ISIZE;
_size = 0;
_value = 0;
}
break;
}
case ISIZE:
{
_value += (currByte & 0xFF) << 8 * _size;
++_size;
if (_size == 4)
{
if (_value != _inflater.getBytesWritten())
throw new ZipException("Invalid input size");
// TODO ByteBuffer result = output == null ? BufferUtil.EMPTY_BUFFER : ByteBuffer.wrap(output);
reset();
return ;
}
break;
}
default:
throw new ZipException();
}
}
}
catch (ZipException x)
{
throw new RuntimeException(x);
}
finally
{
if (buffer!=null)
release(buffer);
}
}
private void reset()
{
_inflater.reset();
_state = State.INITIAL;
_size = 0;
_value = 0;
_flags = 0;
}
public boolean isFinished()
{
return _state == State.INITIAL;
}
private enum State
{
INITIAL, ID, CM, FLG, MTIME, XFL, OS, FLAGS, EXTRA_LENGTH, EXTRA, NAME, COMMENT, HCRC, DATA, CRC, ISIZE
}
/**
* @return An indirect buffer of the configured buffersize either from the pool or freshly allocated.
*/
public ByteBuffer acquire()
{
return _pool==null?BufferUtil.allocate(_bufferSize):_pool.acquire(_bufferSize,false);
}
/**
* Release an allocated buffer.
* <p>This method will called {@link ByteBufferPool#release(ByteBuffer)} if a buffer pool has
* been configured. This method should be called once for all buffers returned from {@link #decode(ByteBuffer)}
* or passed to {@link #decodedChunk(ByteBuffer)}.
* @param buffer The buffer to release.
*/
public void release(ByteBuffer buffer)
{
if (_pool!=null && buffer!=BufferUtil.EMPTY_BUFFER)
_pool.release(buffer);
}
}

View File

@ -0,0 +1,343 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class GZIPContentDecoderTest
{
@Rule
public final TestTracker tracker = new TestTracker();
ArrayByteBufferPool pool;
AtomicInteger buffers = new AtomicInteger(0);
@Before
public void beforeClass() throws Exception
{
buffers.set(0);
pool = new ArrayByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
buffers.incrementAndGet();
return super.acquire(size,direct);
}
@Override
public void release(ByteBuffer buffer)
{
buffers.decrementAndGet();
super.release(buffer);
}
};
}
@After
public void afterClass() throws Exception
{
assertEquals(0,buffers.get());
}
@Test
public void testStreamNoBlocks() throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.close();
byte[] bytes = baos.toByteArray();
GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1);
int read = input.read();
assertEquals(-1, read);
}
@Test
public void testStreamBigBlockOneByteAtATime() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
baos = new ByteArrayOutputStream();
GZIPInputStream input = new GZIPInputStream(new ByteArrayInputStream(bytes), 1);
int read;
while ((read = input.read()) >= 0)
baos.write(read);
assertEquals(data, new String(baos.toByteArray(), StandardCharsets.UTF_8));
}
@Test
public void testNoBlocks() throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.close();
byte[] bytes = baos.toByteArray();
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
assertEquals(0, decoded.remaining());
}
@Test
public void testSmallBlock() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
decoder.release(decoded);
}
@Test
public void testSmallBlockWithGZIPChunkedAtBegin() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The header is 10 bytes, chunk at 11 bytes
byte[] bytes1 = new byte[11];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(0, decoded.capacity());
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
decoder.release(decoded);
}
@Test
public void testSmallBlockWithGZIPChunkedAtEnd() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The trailer is 8 bytes, chunk the last 9 bytes
byte[] bytes1 = new byte[bytes.length - 9];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
assertFalse(decoder.isFinished());
decoder.release(decoded);
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(0, decoded.remaining());
assertTrue(decoder.isFinished());
decoder.release(decoded);
}
@Test
public void testSmallBlockWithGZIPTrailerChunked() throws Exception
{
String data = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// The trailer is 4+4 bytes, chunk the last 3 bytes
byte[] bytes1 = new byte[bytes.length - 3];
System.arraycopy(bytes, 0, bytes1, 0, bytes1.length);
byte[] bytes2 = new byte[bytes.length - bytes1.length];
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
assertEquals(0, decoded.capacity());
decoder.release(decoded);
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
decoder.release(decoded);
}
@Test
public void testTwoSmallBlocks() throws Exception
{
String data1 = "0";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data1.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes1 = baos.toByteArray();
String data2 = "1";
baos = new ByteArrayOutputStream();
output = new GZIPOutputStream(baos);
output.write(data2.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes2 = baos.toByteArray();
byte[] bytes = new byte[bytes1.length + bytes2.length];
System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ByteBuffer decoded = decoder.decode(buffer);
assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString());
assertTrue(decoder.isFinished());
assertTrue(buffer.hasRemaining());
decoder.release(decoded);
decoded = decoder.decode(buffer);
assertEquals(data2, StandardCharsets.UTF_8.decode(decoded).toString());
assertTrue(decoder.isFinished());
assertFalse(buffer.hasRemaining());
decoder.release(decoded);
}
@Test
public void testBigBlock() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(pool,2048);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(buffer);
result += StandardCharsets.UTF_8.decode(decoded).toString();
decoder.release(decoded);
}
assertEquals(data, result);
}
@Test
public void testBigBlockOneByteAtATime() throws Exception
{
String data = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(64);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(new byte[]{buffer.get()}));
if (decoded.hasRemaining())
result += StandardCharsets.UTF_8.decode(decoded).toString();
decoder.release(decoded);
}
assertEquals(data, result);
assertTrue(decoder.isFinished());
}
@Test
public void testBigBlockWithExtraBytes() throws Exception
{
String data1 = "0123456789ABCDEF";
for (int i = 0; i < 10; ++i)
data1 += data1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data1.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes1 = baos.toByteArray();
String data2 = "HELLO";
byte[] bytes2 = data2.getBytes(StandardCharsets.UTF_8);
byte[] bytes = new byte[bytes1.length + bytes2.length];
System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
String result = "";
GZIPContentDecoder decoder = new GZIPContentDecoder(64);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(buffer);
if (decoded.hasRemaining())
result += StandardCharsets.UTF_8.decode(decoded).toString();
decoder.release(decoded);
if (decoder.isFinished())
break;
}
assertEquals(data1, result);
assertTrue(buffer.hasRemaining());
assertEquals(data2, StandardCharsets.UTF_8.decode(buffer).toString());
}
}

View File

@ -15,6 +15,8 @@
<Set name="minGzipSize"><Property name="jetty.gzip.minGzipSize" deprecated="gzip.minGzipSize" default="2048"/></Set>
<Set name="checkGzExists"><Property name="jetty.gzip.checkGzExists" deprecated="gzip.checkGzExists" default="false"/></Set>
<Set name="compressionLevel"><Property name="jetty.gzip.compressionLevel" deprecated="gzip.compressionLevel" default="-1"/></Set>
<Set name="inflateBufferSize"><Property name="jetty.gzip.inflateBufferSize" default="0/></Set>
<Set name="excludedAgentPatterns">
<Array type="String">
<Item><Property name="jetty.gzip.excludedUserAgent" deprecated="gzip.excludedUserAgent" default=".*MSIE.6\.0.*"/></Item>

View File

@ -20,3 +20,6 @@ etc/jetty-gzip.xml
## User agents for which gzip is disabled
# jetty.gzip.excludedUserAgent=.*MSIE.6\.0.*
## Inflate request buffer size, or 0 for no request inflation
# jetty.gzip.inflateBufferSize=0

View File

@ -25,6 +25,7 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -39,23 +40,87 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/**
* {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
* <p>
* Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
* maintains two states: the content state that tells whether there is content to consume and the EOF
* state that tells whether an EOF has arrived.
* Only once the content has been consumed the content state is moved to the EOF state.
* Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class maintains two states: the content state that tells
* whether there is content to consume and the EOF state that tells whether an EOF has arrived. Only once the content has been consumed the content state is
* moved to the EOF state.
*/
/**
* @author gregw
*
*/
public class HttpInput extends ServletInputStream implements Runnable
{
/**
* An interceptor for HTTP Request input.
* <p>
* Unlike inputstream wrappers that can be applied by filters, an interceptor
* is entirely transparent and works with async IO APIs.
* <p>
* An Interceptor may consume data from the passed content and the interceptor
* will continue to be called for the same content until the interceptor returns
* null or an empty content. Thus even if the passed content is completely consumed
* the interceptor will be called with the same content until it can no longer
* produce more content.
* @see HttpInput#setInterceptor(Interceptor)
* @see HttpInput#addInterceptor(Interceptor)
*/
public interface Interceptor
{
/**
* @param content The content to be intercepted (may be empty or a {@link SentinelContent}.
* The content will be modified with any data the interceptor consumes, but there is no requirement
* that all the data is consumed by the interceptor.
* @return The intercepted content or null if interception is completed for that content.
*/
Content readFrom(Content content);
}
/**
* An {@link Interceptor} that chains two other {@link Interceptor}s together.
* The {@link #readFrom(Content)} calls the previous {@link Interceptor}'s
* {@link #readFrom(Content)} and then passes any {@link Content} returned
* to the next {@link Interceptor}.
*/
public static class ChainedInterceptor implements Interceptor
{
private final Interceptor _prev;
private final Interceptor _next;
public ChainedInterceptor(Interceptor prev, Interceptor next)
{
_prev = prev;
_next = next;
}
public Interceptor getPrev()
{
return _prev;
}
public Interceptor getNext()
{
return _next;
}
@Override
public Content readFrom(Content content)
{
return _next.readFrom(_prev.readFrom(content));
}
}
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final static Content EOF_CONTENT = new EofContent("EOF");
private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
private final byte[] _oneByteBuffer = new byte[1];
private Content _content;
private Content _intercepted;
private final Deque<Content> _inputQ = new ArrayDeque<>();
private final HttpChannelState _channelState;
private ReadListener _listener;
@ -64,6 +129,7 @@ public class HttpInput extends ServletInputStream implements Runnable
private long _contentArrived;
private long _contentConsumed;
private long _blockUntil;
private Interceptor _interceptor;
public HttpInput(HttpChannelState state)
{
@ -79,6 +145,9 @@ public class HttpInput extends ServletInputStream implements Runnable
{
synchronized (_inputQ)
{
if (_content!=null)
_content.failed(null);
_content = null;
Content item = _inputQ.poll();
while (item != null)
{
@ -91,9 +160,40 @@ public class HttpInput extends ServletInputStream implements Runnable
_contentConsumed = 0;
_firstByteTimeStamp = -1;
_blockUntil = 0;
_interceptor = null;
}
}
/**
* @return The current Interceptor, or null if none set
*/
public Interceptor getInterceptor()
{
return _interceptor;
}
/**
* Set the interceptor.
* @param interceptor The interceptor to use.
*/
public void setInterceptor(Interceptor interceptor)
{
_interceptor = interceptor;
}
/**
* Set the {@link Interceptor}, using a {@link ChainedInterceptor} if
* an {@link Interceptor} is already set.
* @param interceptor the next {@link Interceptor} in a chain
*/
public void addInterceptor(Interceptor interceptor)
{
if (_interceptor == null)
_interceptor = interceptor;
else
_interceptor = new ChainedInterceptor(_interceptor,interceptor);
}
@Override
public int available()
{
@ -101,8 +201,9 @@ public class HttpInput extends ServletInputStream implements Runnable
boolean woken = false;
synchronized (_inputQ)
{
Content content = _inputQ.peek();
if (content == null)
if (_content == null)
_content = _inputQ.poll();
if (_content == null)
{
try
{
@ -112,11 +213,12 @@ public class HttpInput extends ServletInputStream implements Runnable
{
woken = failed(e);
}
content = _inputQ.peek();
if (_content == null)
_content = _inputQ.poll();
}
if (content != null)
available = remaining(content);
if (_content != null)
available = _content.remaining();
}
if (woken)
@ -139,10 +241,10 @@ public class HttpInput extends ServletInputStream implements Runnable
@Override
public int read() throws IOException
{
int read = read(_oneByteBuffer, 0, 1);
int read = read(_oneByteBuffer,0,1);
if (read == 0)
throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
return read < 0?-1:_oneByteBuffer[0] & 0xFF;
}
@Override
@ -168,7 +270,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentArrived < minimum_data)
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, String.format("Request data rate < %d B/s", minRequestDataRate));
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate));
}
}
@ -177,11 +279,12 @@ public class HttpInput extends ServletInputStream implements Runnable
Content item = nextContent();
if (item != null)
{
int l = get(item, b, off, len);
int l = get(item,b,off,len);
if (LOG.isDebugEnabled())
LOG.debug("{} read {} from {}", this, l, item);
LOG.debug("{} read {} from {}",this,l,item);
consumeNonContent();
// Consume any following poison pills
pollReadableContent();
return l;
}
@ -193,191 +296,207 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
* Called when derived implementations should attempt to
* produce more Content and add it via {@link #addContent(Content)}.
* For protocols that are constantly producing (eg HTTP2) this can
* be left as a noop;
* Called when derived implementations should attempt to produce more Content and add it via {@link #addContent(Content)}. For protocols that are constantly
* producing (eg HTTP2) this can be left as a noop;
*
* @throws IOException if unable to produce content
* @throws IOException
* if unable to produce content
*/
protected void produceContent() throws IOException
{
}
/**
* Get the next content from the inputQ, calling {@link #produceContent()}
* if need be. EOF is processed and state changed.
* Get the next content from the inputQ, calling {@link #produceContent()} if need be. EOF is processed and state changed.
*
* @return the content or null if none available.
* @throws IOException if retrieving the content fails
* @throws IOException
* if retrieving the content fails
*/
protected Content nextContent() throws IOException
{
Content content = pollContent();
Content content = pollNonEmptyContent();
if (content == null && !isFinished())
{
produceContent();
content = pollContent();
content = pollNonEmptyContent();
}
return content;
}
/**
* Poll the inputQ for Content.
* Consumed buffers and {@link PoisonPillContent}s are removed and
* EOF state updated if need be.
* Poll the inputQ for Content. Consumed buffers and {@link SentinelContent}s are removed and EOF state updated if need be.
*
* @return Content or null
*/
protected Content pollContent()
protected Content pollNonEmptyContent()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue.
while (content != null && remaining(content) == 0)
{
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
if (content == EOF_CONTENT)
while (true)
{
// Get the next content (or EOF)
Content content = pollReadableContent();
// If it is EOF, consume it here
if (content instanceof SentinelContent)
{
if (_listener == null)
_state = EOF;
else
if (content == EARLY_EOF_CONTENT)
_state = EARLY_EOF;
else if (content instanceof EofContent)
{
_state = AEOF;
boolean woken = _channelState.onReadReady(); // force callback?
if (woken)
wake();
if (_listener == null)
_state = EOF;
else
{
_state = AEOF;
boolean woken = _channelState.onReadReady(); // force callback?
if (woken)
wake();
}
}
// Consume the EOF content, either if it was original content
// or if it was produced by interception
content.succeeded();
if (_content==content)
_content = null;
else if (_intercepted==content)
_intercepted = null;
continue;
}
else if (content == EARLY_EOF_CONTENT)
_state = EARLY_EOF;
content = _inputQ.peek();
return content;
}
return content;
}
/**
* Poll the inputQ for Content or EOF. Consumed buffers and non EOF {@link SentinelContent}s are removed. EOF state is not updated.
* Interception is done within this method.
* @return Content with remaining, a {@link SentinelContent}, or null
*/
protected void consumeNonContent()
protected Content pollReadableContent()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue.
while (content != null && remaining(content) == 0)
// If we have a chunk produced by interception
if (_intercepted!=null)
{
// Defer EOF until read
if (content instanceof EofContent)
break;
// Consume all other empty content
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
content = _inputQ.peek();
// Use it if it has any remaining content
if (_intercepted.hasContent())
return _intercepted;
// succeed the chunk
_intercepted.succeeded();
_intercepted=null;
}
// If we don't have a Content under consideration, get
// the next one off the input Q.
if (_content == null)
_content = _inputQ.poll();
// While we have content to consider.
while (_content!=null)
{
// Are we intercepting?
if (_interceptor!=null)
{
// Intercept the current content (may be called several
// times for the same content
_intercepted = _interceptor.readFrom(_content);
// If interception produced new content
if (_intercepted!=null && _intercepted!=_content)
{
// if it is not empty use it
if (_intercepted.hasContent())
return _intercepted;
_intercepted.succeeded();
}
// intercepted content consumed
_intercepted=null;
// fall through so that the unintercepted _content is
// considered for any remaining content, for EOF and to
// succeed it if it is entirely consumed.
}
// If the content has content or is an EOF marker, use it
if (_content.hasContent() || _content instanceof SentinelContent)
return _content;
// The content is consumed, so get the next one. Note that EOF
// content is never consumed here, but in #pollContent
_content.succeeded();
_content = _inputQ.poll();
}
return null;
}
/**
* Get the next readable from the inputQ, calling {@link #produceContent()}
* if need be. EOF is NOT processed and state is not changed.
* Get the next readable from the inputQ, calling {@link #produceContent()} if need be. EOF is NOT processed and state is not changed.
*
* @return the content or EOF or null if none available.
* @throws IOException if retrieving the content fails
* @throws IOException
* if retrieving the content fails
*/
protected Content nextReadable() throws IOException
{
Content content = pollReadable();
Content content = pollReadableContent();
if (content == null && !isFinished())
{
produceContent();
content = pollReadable();
content = pollReadableContent();
}
return content;
}
/**
* Poll the inputQ for Content or EOF.
* Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
* EOF state is not updated.
*
* @return Content, EOF or null
*/
protected Content pollReadable()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peek();
// Skip consumed items at the head of the queue except EOF
while (content != null)
{
if (content == EOF_CONTENT || content == EARLY_EOF_CONTENT || remaining(content) > 0)
return content;
_inputQ.poll();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
content = _inputQ.peek();
}
return null;
}
/**
* @param item the content
* @return how many bytes remain in the given content
*/
protected int remaining(Content item)
{
return item.remaining();
}
/**
* Copies the given content into the given byte buffer.
*
* @param content the content to copy from
* @param buffer the buffer to copy into
* @param offset the buffer offset to start copying from
* @param length the space available in the buffer
* @param content
* the content to copy from
* @param buffer
* the buffer to copy into
* @param offset
* the buffer offset to start copying from
* @param length
* the space available in the buffer
* @return the number of bytes actually copied
*/
protected int get(Content content, byte[] buffer, int offset, int length)
{
int l = Math.min(content.remaining(), length);
content.getContent().get(buffer, offset, l);
int l = content.get(buffer,offset,length);
_contentConsumed += l;
return l;
}
/**
* Consumes the given content.
* Calls the content succeeded if all content consumed.
* Consumes the given content. Calls the content succeeded if all content consumed.
*
* @param content the content to consume
* @param length the number of bytes to consume
* @param content
* the content to consume
* @param length
* the number of bytes to consume
*/
protected void skip(Content content, int length)
{
int l = Math.min(content.remaining(), length);
ByteBuffer buffer = content.getContent();
buffer.position(buffer.position() + l);
int l = content.skip(length);
_contentConsumed += l;
if (l > 0 && !content.hasContent())
pollContent(); // hungry succeed
if (l > 0 && content.isEmpty())
pollNonEmptyContent(); // hungry succeed
}
/**
* Blocks until some content or some end-of-file event arrives.
*
* @throws IOException if the wait is interrupted
* @throws IOException
* if the wait is interrupted
*/
protected void blockForContent() throws IOException
{
@ -392,7 +511,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content timeout={}", this, timeout);
LOG.debug("{} blocking for content timeout={}",this,timeout);
if (timeout > 0)
_inputQ.wait(timeout);
else
@ -402,7 +521,7 @@ public class HttpInput extends ServletInputStream implements Runnable
// TODO: so spurious wakeups are not handled correctly.
if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0)
throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout()));
throw new TimeoutException(String.format("Blocking timeout %d ms",getBlockingTimeout()));
}
catch (Throwable e)
{
@ -412,11 +531,12 @@ public class HttpInput extends ServletInputStream implements Runnable
/**
* Adds some content to the start of this input stream.
* <p>Typically used to push back content that has
* been read, perhaps mutated. The bytes prepended are
* deducted for the contentConsumed total</p>
* <p>
* Typically used to push back content that has been read, perhaps mutated. The bytes prepended are deducted for the contentConsumed total
* </p>
*
* @param item the content to add
* @param item
* the content to add
* @return true if content channel woken for read
*/
public boolean prependContent(Content item)
@ -424,44 +544,54 @@ public class HttpInput extends ServletInputStream implements Runnable
boolean woken = false;
synchronized (_inputQ)
{
_inputQ.push(item);
if (_content != null)
_inputQ.push(_content);
_content = item;
_contentConsumed -= item.remaining();
if (LOG.isDebugEnabled())
LOG.debug("{} prependContent {}", this, item);
LOG.debug("{} prependContent {}",this,item);
if (_listener == null)
_inputQ.notify();
else
woken = _channelState.onReadPossible();
}
return woken;
}
/**
* Adds some content to this input stream.
*
* @param item the content to add
* @param content
* the content to add
* @return true if content channel woken for read
*/
public boolean addContent(Content item)
public boolean addContent(Content content)
{
boolean woken = false;
synchronized (_inputQ)
{
if (_firstByteTimeStamp == -1)
_firstByteTimeStamp = System.nanoTime();
_contentArrived += item.remaining();
_inputQ.offer(item);
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, item);
if (_listener == null)
_inputQ.notify();
_contentArrived += content.remaining();
if (_content==null && _inputQ.isEmpty())
_content=content;
else
woken = _channelState.onReadPossible();
}
_inputQ.offer(content);
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}",this,content);
if (pollReadableContent()!=null)
{
if (_listener == null)
_inputQ.notify();
else
woken = _channelState.onReadPossible();
}
}
return woken;
}
@ -469,7 +599,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
synchronized (_inputQ)
{
return _inputQ.size() > 0;
return _content!=null || _inputQ.size() > 0;
}
}
@ -490,11 +620,9 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
* This method should be called to signal that an EOF has been
* detected before all the expected content arrived.
* This method should be called to signal that an EOF has been detected before all the expected content arrived.
* <p>
* Typically this will result in an EOFException being thrown
* from a subsequent read rather than a -1 return.
* Typically this will result in an EOFException being thrown from a subsequent read rather than a -1 return.
*
* @return true if content channel woken for read
*/
@ -504,8 +632,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/**
* This method should be called to signal that all the expected
* content arrived.
* This method should be called to signal that all the expected content arrived.
*
* @return true if content channel woken for read
*/
@ -526,7 +653,7 @@ public class HttpInput extends ServletInputStream implements Runnable
if (item == null)
break; // Let's not bother blocking
skip(item, remaining(item));
skip(item,item.remaining());
}
return isFinished() && !isError();
}
@ -641,11 +768,8 @@ public class HttpInput extends ServletInputStream implements Runnable
}
/*
* <p>
* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
* runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)}
* to setup classloaders etc.
* </p>
* <p> While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link
* ContextHandler#handle(Runnable)} to setup classloaders etc. </p>
*/
@Override
public void run()
@ -666,7 +790,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
listener = _listener;
error = _state instanceof ErrorState ? ((ErrorState)_state).getError() : null;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
}
try
@ -721,19 +845,24 @@ public class HttpInput extends ServletInputStream implements Runnable
content = _inputQ.peekFirst();
}
return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]",
getClass().getSimpleName(),
hashCode(),
consumed,
q,
content,
state);
getClass().getSimpleName(),
hashCode(),
consumed,
q,
content,
state);
}
public static class PoisonPillContent extends Content
/**
* A Sentinel Content, which has zero length content but
* indicates some other event in the input stream (eg EOF)
*
*/
public static class SentinelContent extends Content
{
private final String _name;
public PoisonPillContent(String name)
public SentinelContent(String name)
{
super(BufferUtil.EMPTY_BUFFER);
_name = name;
@ -746,7 +875,7 @@ public class HttpInput extends ServletInputStream implements Runnable
}
}
public static class EofContent extends PoisonPillContent
public static class EofContent extends SentinelContent
{
EofContent(String name)
{
@ -756,22 +885,36 @@ public class HttpInput extends ServletInputStream implements Runnable
public static class Content implements Callback
{
private final ByteBuffer _content;
protected final ByteBuffer _content;
public Content(ByteBuffer content)
{
_content = content;
}
public ByteBuffer getByteBuffer()
{
return _content;
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
public ByteBuffer getContent()
public int get(byte[] buffer, int offset, int length)
{
return _content;
length = Math.min(_content.remaining(),length);
_content.get(buffer,offset,length);
return length;
}
public int skip(int length)
{
length = Math.min(_content.remaining(),length);
_content.position(_content.position() + length);
return length;
}
public boolean hasContent()
@ -783,15 +926,19 @@ public class HttpInput extends ServletInputStream implements Runnable
{
return _content.remaining();
}
public boolean isEmpty()
{
return !_content.hasRemaining();
}
@Override
public String toString()
{
return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content));
return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
}
}
protected static abstract class State
{
public boolean blockForContent(HttpInput in) throws IOException

View File

@ -175,9 +175,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _interceptor;
}
public void setInterceptor(Interceptor filter)
public void setInterceptor(Interceptor interceptor)
{
_interceptor = filter;
_interceptor = interceptor;
}
public boolean isWritten()

View File

@ -65,6 +65,7 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
private int _compressionLevel=Deflater.DEFAULT_COMPRESSION;
private boolean _checkGzExists = true;
private boolean _syncFlush = false;
private int _inflateBufferSize = -1;
// non-static, as other GzipHandler instances may have different configurations
private final ThreadLocal<Deflater> _deflater = new ThreadLocal<>();
@ -77,6 +78,7 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
private HttpField _vary;
/* ------------------------------------------------------------ */
/**
* Instantiates a new gzip handler.
@ -398,6 +400,24 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
return _vary;
}
/* ------------------------------------------------------------ */
/**
* @return size in bytes of the buffer to inflate compressed request, or 0 for no inflation.
*/
public int getInflateBufferSize()
{
return _inflateBufferSize;
}
/* ------------------------------------------------------------ */
/**
* @param size size in bytes of the buffer to inflate compressed request, or 0 for no inflation.
*/
public void setInflateBufferSize(int size)
{
_inflateBufferSize = size;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.server.handler.HandlerWrapper#handle(java.lang.String, org.eclipse.jetty.server.Request, javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
@ -409,6 +429,19 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
String path = context==null?baseRequest.getRequestURI():URIUtil.addPaths(baseRequest.getServletPath(),baseRequest.getPathInfo());
LOG.debug("{} handle {} in {}",this,baseRequest,context);
// Handle request inflation
if (_inflateBufferSize>0)
{
HttpField ce = baseRequest.getHttpFields().getField(HttpHeader.CONTENT_ENCODING);
if (ce!=null && "gzip".equalsIgnoreCase(ce.getValue()))
{
// TODO should check ce.contains and then remove just the gzip encoding
baseRequest.getHttpFields().remove(HttpHeader.CONTENT_ENCODING);
baseRequest.getHttpFields().add(new HttpField("X-Content-Encoding",ce.getValue()));
baseRequest.getHttpInput().addInterceptor(new GzipHttpInputInterceptor(baseRequest.getHttpChannel().getByteBufferPool(),_inflateBufferSize));
}
}
HttpOutput out = baseRequest.getResponse().getHttpOutput();
// Are we already being gzipped?
HttpOutput.Interceptor interceptor = out.getInterceptor();

View File

@ -0,0 +1,83 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.handler.gzip;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.GZIPContentDecoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpInput.Content;
/**
* A HttpInput Interceptor that inflates GZIP encoded request content.
*
*/
public class GzipHttpInputInterceptor implements HttpInput.Interceptor
{
class Decoder extends GZIPContentDecoder
{
public Decoder(ByteBufferPool pool, int bufferSize)
{
super(pool,bufferSize);
}
@Override
protected boolean decodedChunk(final ByteBuffer chunk)
{
_chunk = chunk;
return true;
}
@Override
public void decodeChunks(ByteBuffer compressed)
{
_chunk = null;
super.decodeChunks(compressed);
}
}
private final Decoder _decoder;
private ByteBuffer _chunk;
public GzipHttpInputInterceptor(ByteBufferPool pool, int bufferSize)
{
_decoder = new Decoder(pool,bufferSize);
}
@Override
public Content readFrom(Content content)
{
_decoder.decodeChunks(content.getByteBuffer());
final ByteBuffer chunk = _chunk;
if (chunk==null)
return null;
return new Content(chunk)
{
@Override
public void succeeded()
{
_decoder.release(chunk);
}
};
}
}

View File

@ -354,6 +354,7 @@ public class AsyncRequestReadTest
for (int i=read;i-->0;)
{
int c=in.read();
// System.err.println("in="+c);
if (c<0)
break;
out.write(c);

View File

@ -33,8 +33,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@ -45,6 +47,7 @@ import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.hamcrest.Matchers;
import org.junit.After;
@ -86,6 +89,7 @@ public class GzipHandlerTest
GzipHandler gzipHandler = new GzipHandler();
gzipHandler.setExcludedAgentPatterns();
gzipHandler.setMinGzipSize(16);
gzipHandler.setInflateBufferSize(4096);
ServletContextHandler context = new ServletContextHandler(gzipHandler,"/ctx");
ServletHandler servlets = context.getServletHandler();
@ -97,6 +101,7 @@ public class GzipHandlerTest
servlets.addServletWithMapping(TestServlet.class,"/content");
servlets.addServletWithMapping(ForwardServlet.class,"/forward");
servlets.addServletWithMapping(IncludeServlet.class,"/include");
servlets.addServletWithMapping(EchoServlet.class,"/echo/*");
_server.start();
}
@ -147,6 +152,21 @@ public class GzipHandlerTest
}
}
}
public static class EchoServlet extends HttpServlet
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException
{
response.setContentType(req.getContentType());
IO.copy(req.getInputStream(),response.getOutputStream());
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException
{
doGet(req,response);
}
}
public static class ForwardServlet extends HttpServlet
{
@ -392,4 +412,68 @@ public class GzipHandlerTest
assertThat("Included Paths.size", includedPaths.length, is(2));
assertThat("Included Paths", Arrays.asList(includedPaths), contains("/foo","^/bar.*$"));
}
@Test
public void testGzipRequest() throws Exception
{
String data = "Hello Nice World! ";
for (int i = 0; i < 10; ++i)
data += data;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data.getBytes(StandardCharsets.UTF_8));
output.close();
byte[] bytes = baos.toByteArray();
// generated and parsed test
HttpTester.Request request = HttpTester.newRequest();
HttpTester.Response response;
request.setMethod("POST");
request.setURI("/ctx/echo");
request.setVersion("HTTP/1.0");
request.setHeader("Host","tester");
request.setHeader("Content-Type","text/plain");
request.setHeader("Content-Encoding","gzip");
request.setContent(bytes);
response = HttpTester.parseResponse(_connector.getResponse(request.generate()));
assertThat(response.getStatus(),is(200));
assertThat(response.getContent(),is(data));
}
@Test
public void testGzipBomb() throws Exception
{
byte[] data = new byte[512*1024];
Arrays.fill(data,(byte)'X');
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream output = new GZIPOutputStream(baos);
output.write(data);
output.close();
byte[] bytes = baos.toByteArray();
// generated and parsed test
HttpTester.Request request = HttpTester.newRequest();
HttpTester.Response response;
request.setMethod("POST");
request.setURI("/ctx/echo");
request.setVersion("HTTP/1.0");
request.setHeader("Host","tester");
request.setHeader("Content-Type","text/plain");
request.setHeader("Content-Encoding","gzip");
request.setContent(bytes);
response = HttpTester.parseResponse(_connector.getResponse(request.generate()));
// TODO need to test back pressure works
assertThat(response.getStatus(),is(200));
assertThat(response.getContentBytes().length,is(512*1024));
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
@ -58,9 +59,12 @@ import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
@ -68,6 +72,8 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import static java.nio.ByteBuffer.wrap;
import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@ -1038,4 +1044,174 @@ public class AsyncIOServletTest extends AbstractTest
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testAsyncIntercepted() throws Exception
{
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
System.err.println("Service "+request);
final HttpInput httpInput = ((Request)request).getHttpInput();
httpInput.addInterceptor(new HttpInput.Interceptor()
{
int state = 0;
Content saved;
@Override
public Content readFrom(Content content)
{
// System.err.printf("readFrom s=%d saved=%b %s%n",state,saved!=null,content);
switch(state)
{
case 0:
// null transform
if (content.isEmpty())
state++;
return null;
case 1:
{
// copy transform
if (content.isEmpty())
{
state++;
return content;
}
ByteBuffer copy = wrap(toArray(content.getByteBuffer()));
content.skip(copy.remaining());
return new Content(copy);
}
case 2:
// byte by byte
if (content.isEmpty())
{
state++;
return content;
}
byte[] b = new byte[1];
int l = content.get(b,0,1);
return new Content(wrap(b,0,l));
case 3:
{
// double vision
if (content.isEmpty())
{
if (saved==null)
{
state++;
return content;
}
Content copy = saved;
saved=null;
return copy;
}
byte[] data = toArray(content.getByteBuffer());
content.skip(data.length);
saved = new Content(wrap(data));
return new Content(wrap(data));
}
default:
return null;
}
}
});
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
ByteArrayOutputStream out = new ByteArrayOutputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady() && !input.isFinished())
{
int b = input.read();
if (b>0)
{
// System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b));
out.write(b);
}
else
onAllDataRead();
}
}
@Override
public void onAllDataRead() throws IOException
{
response.getOutputStream().write(out.toByteArray());
asyncContext.complete();
}
@Override
public void onError(Throwable x)
{
}
});
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
"S0" +
"S1" +
"S2" +
"S3S3" +
"S4" +
"S5" +
"S6";
client.newRequest(newURI())
.method(HttpMethod.POST)
.path(servletPath)
.content(contentProvider)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
{
Response response = result.getResponse();
assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200));
assertThat(getContentAsString(), Matchers.equalTo(expected));
clientLatch.countDown();
}
}
});
contentProvider.offer(BufferUtil.toBuffer("S0"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S1"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S2"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S3"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S4"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S5"));
contentProvider.flush();
contentProvider.offer(BufferUtil.toBuffer("S6"));
contentProvider.close();
Assert.assertTrue(clientLatch.await(10,TimeUnit.SECONDS));
}
}