Fix #4461 HttpOutput Aggregation (#4466)

* Issue #4461 HttpOutput Aggregation

Added tests to check that aggregation continues after first flush of an aggregated buffer (this triggers both #4461 and the discovered bug of not aggregating because of empty at capacity aggregate buffer).

Added getAggregateSize method that does a compact to avoid empty at capacity aggregate buffer

Call onWriteComplete if residue of an overflow aggregation can itself be aggregated.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4461 HttpOutput Aggregation

Removed implicit compact from GzipHandler

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4461 HttpOutput Aggregation

Improve test coverage

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4461 HttpOutput Aggregation

Remove case that can never happen.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4461 HttpOutput Aggregation

updates from review

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2020-01-10 07:20:54 +11:00 committed by GitHub
parent 69808d3851
commit 96f6f2bb8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 377 additions and 25 deletions

View File

@ -378,6 +378,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return wake;
}
private int maximizeAggregateSpace()
{
// If no aggregate, we can allocate one of bufferSize
if (_aggregate == null)
return getBufferSize();
// compact to maximize space
BufferUtil.compact(_aggregate);
return BufferUtil.space(_aggregate);
}
public void softClose()
{
synchronized (_channelState)
@ -723,6 +735,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override
public void write(byte[] b, int off, int len) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("write(array {})", BufferUtil.toDetailString(ByteBuffer.wrap(b, off, len)));
boolean last;
boolean aggregate;
boolean flush;
@ -733,7 +748,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
checkWritable();
long written = _written + len;
int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate);
int space = maximizeAggregateSpace();
last = _channel.getResponse().isAllContentWritten(written);
// Write will be aggregated if:
// + it is smaller than the commitSize
@ -777,7 +792,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// return if we are not complete, not full and filled all the content
if (!flush)
{
if (LOG.isDebugEnabled())
LOG.debug("write(array) {} aggregated !flush {}",
stateString(), BufferUtil.toDetailString(_aggregate));
return;
}
// adjust offset/length
off += filled;
@ -785,6 +805,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
}
if (LOG.isDebugEnabled())
LOG.debug("write(array) {} last={} agg={} flush=true async={}, len={} {}",
stateString(), last, aggregate, async, len, BufferUtil.toDetailString(_aggregate));
if (async)
{
// Do the asynchronous writing from the callback
@ -801,9 +825,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
channelWrite(_aggregate, last && len == 0);
// should we fill aggregate again from the buffer?
if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate))
if (len > 0 && !last && len <= _commitSize && len <= maximizeAggregateSpace())
{
BufferUtil.append(_aggregate, b, off, len);
onWriteComplete(false, null);
return;
}
}
@ -929,7 +954,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
checkWritable();
long written = _written + 1;
int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate);
int space = maximizeAggregateSpace();
last = _channel.getResponse().isAllContentWritten(written);
flush = last || space == 1;
@ -1602,7 +1627,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private final ByteBuffer _buffer;
private final ByteBuffer _slice;
private final int _len;
volatile boolean _completed;
private boolean _completed;
AsyncWrite(byte[] b, int off, int len, boolean last)
{
@ -1639,7 +1664,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
// Can we just aggregate the remainder?
if (!_last && _len < BufferUtil.space(_aggregate) && _len < _commitSize)
if (!_last && _aggregate != null && _len < maximizeAggregateSpace() && _len < _commitSize)
{
int position = BufferUtil.flipToFill(_aggregate);
BufferUtil.put(_buffer, _aggregate);
@ -1846,32 +1871,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private class WriteCompleteCB implements Callback
{
final Callback _callback;
WriteCompleteCB()
{
this(null);
}
WriteCompleteCB(Callback callback)
{
_callback = callback;
}
@Override
public void succeeded()
{
onWriteComplete(true, null);
if (_callback != null)
_callback.succeeded();
}
@Override
public void failed(Throwable x)
{
onWriteComplete(true, x);
if (_callback != null)
_callback.succeeded();
}
}
}

View File

@ -381,7 +381,7 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
int len = slice.remaining();
_crc.update(array, off, len);
_deflater.setInput(array, off, len); // TODO use ByteBuffer API in Jetty-10
BufferUtil.clear(slice);
slice.position(slice.position() + len);
if (_last && BufferUtil.isEmpty(_content))
_deflater.finish();
}

View File

@ -18,11 +18,14 @@
package org.eclipse.jetty.server;
import java.io.ByteArrayOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
@ -31,6 +34,7 @@ import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpOutput.Interceptor;
import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
@ -54,6 +58,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class HttpOutputTest
{
public static final int OUTPUT_AGGREGATION_SIZE = 1024;
public static final int OUTPUT_BUFFER_SIZE = 4096;
private Server _server;
private LocalConnector _connector;
private ContentHandler _handler;
@ -64,10 +70,25 @@ public class HttpOutputTest
{
_server = new Server();
_server.addBean(new ByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
return direct ? BufferUtil.allocateDirect(size) : BufferUtil.allocate(size);
}
@Override
public void release(ByteBuffer buffer)
{
}
});
HttpConnectionFactory http = new HttpConnectionFactory();
http.getHttpConfiguration().setRequestHeaderSize(1024);
http.getHttpConfiguration().setResponseHeaderSize(1024);
http.getHttpConfiguration().setOutputBufferSize(4096);
http.getHttpConfiguration().setOutputBufferSize(OUTPUT_BUFFER_SIZE);
http.getHttpConfiguration().setOutputAggregationSize(OUTPUT_AGGREGATION_SIZE);
_connector = new LocalConnector(_server, http, null);
_server.addConnector(_connector);
@ -676,6 +697,330 @@ public class HttpOutputTest
assertThat(response, containsString("400\tTHIS IS A BIGGER FILE"));
}
@Test
public void testAggregation() throws Exception
{
AggregateHandler handler = new AggregateHandler();
_swap.setHandler(handler);
handler.start();
String response = _connector.getResponse("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response, containsString("HTTP/1.1 200 OK"));
assertThat(response, containsString(handler.expected.toString()));
}
static class AggregateHandler extends AbstractHandler
{
ByteArrayOutputStream expected = new ByteArrayOutputStream();
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
HttpOutput out = (HttpOutput)response.getOutputStream();
// Add interceptor to check aggregation is done
HttpOutput.Interceptor interceptor = out.getInterceptor();
out.setInterceptor(new AggregationChecker(interceptor));
int bufferSize = baseRequest.getHttpChannel().getHttpConfiguration().getOutputBufferSize();
int len = bufferSize * 3 / 2;
byte[] data = new byte[AggregationChecker.MAX_SIZE];
int fill = 0;
while (expected.size() < len)
{
Arrays.fill(data, (byte)('A' + (fill++ % 26)));
expected.write(data);
out.write(data);
}
}
}
@Test
public void testAsyncAggregation() throws Exception
{
AsyncAggregateHandler handler = new AsyncAggregateHandler();
_swap.setHandler(handler);
handler.start();
String response = _connector.getResponse("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response, containsString("HTTP/1.1 200 OK"));
assertThat(response, containsString(handler.expected.toString()));
}
static class AsyncAggregateHandler extends AbstractHandler
{
ByteArrayOutputStream expected = new ByteArrayOutputStream();
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
HttpOutput out = (HttpOutput)response.getOutputStream();
// Add interceptor to check aggregation is done
HttpOutput.Interceptor interceptor = out.getInterceptor();
out.setInterceptor(new AggregationChecker(interceptor));
int bufferSize = baseRequest.getHttpChannel().getHttpConfiguration().getOutputBufferSize();
int len = bufferSize * 3 / 2;
AsyncContext async = request.startAsync();
out.setWriteListener(new WriteListener()
{
int fill = 0;
@Override
public void onWritePossible() throws IOException
{
byte[] data = new byte[AggregationChecker.MAX_SIZE];
while (out.isReady())
{
if (expected.size() >= len)
{
async.complete();
return;
}
Arrays.fill(data, (byte)('A' + (fill++ % 26)));
expected.write(data);
out.write(data);
}
}
@Override
public void onError(Throwable t)
{
}
});
}
}
private static class AggregationChecker implements Interceptor
{
static final int MAX_SIZE = OUTPUT_AGGREGATION_SIZE / 2 - 1;
private final Interceptor interceptor;
public AggregationChecker(Interceptor interceptor)
{
this.interceptor = interceptor;
}
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
if (content.remaining() <= MAX_SIZE)
throw new IllegalStateException("Not Aggregated!");
interceptor.write(content, last, callback);
}
@Override
public Interceptor getNextInterceptor()
{
return interceptor;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return interceptor.isOptimizedForDirectBuffers();
}
}
@Test
public void testAggregateResidue() throws Exception
{
AggregateResidueHandler handler = new AggregateResidueHandler();
_swap.setHandler(handler);
handler.start();
String response = _connector.getResponse("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response, containsString("HTTP/1.1 200 OK"));
assertThat(response, containsString(handler.expected.toString()));
}
static class AggregateResidueHandler extends AbstractHandler
{
ByteArrayOutputStream expected = new ByteArrayOutputStream();
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
HttpOutput out = (HttpOutput)response.getOutputStream();
int bufferSize = baseRequest.getHttpChannel().getHttpConfiguration().getOutputBufferSize();
int commitSize = baseRequest.getHttpChannel().getHttpConfiguration().getOutputAggregationSize();
char fill = 'A';
// write data that will be aggregated
byte[] data = new byte[commitSize - 1];
Arrays.fill(data, (byte)(fill++));
expected.write(data);
out.write(data);
int aggregated = data.length;
// write data that will almost fill the aggregate buffer
while (aggregated < (bufferSize - 1))
{
data = new byte[Math.min(commitSize - 1, bufferSize - aggregated - 1)];
Arrays.fill(data, (byte)(fill++));
expected.write(data);
out.write(data);
aggregated += data.length;
}
// write data that will not be aggregated
data = new byte[bufferSize + 1];
Arrays.fill(data, (byte)(fill++));
expected.write(data);
out.write(data);
}
}
@Test
public void testPrint() throws Exception
{
ByteArrayOutputStream bout = new ByteArrayOutputStream();
PrintWriter exp = new PrintWriter(bout);
_swap.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setCharacterEncoding("UTF8");
HttpOutput out = (HttpOutput)response.getOutputStream();
exp.print("\u20AC\u0939\uD55C");
out.print("\u20AC\u0939\uD55C");
exp.print("zero");
out.print("zero");
exp.print(1);
out.print(1);
exp.print(2L);
out.print(2L);
exp.print(3.0F);
out.print(3.0F);
exp.print('4');
out.print('4');
exp.print(5.0D);
out.print(5.0D);
exp.print(true);
out.print(true);
exp.println("zero");
out.println("zero");
exp.println(-1);
out.println(-1);
exp.println(-2L);
out.println(-2L);
exp.println(-3.0F);
out.println(-3.0F);
exp.println('4');
out.println('4');
exp.println(-5.0D);
out.println(-5.0D);
exp.println(false);
out.println(false);
}
});
_swap.getHandler().start();
String response = _connector.getResponse("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response, containsString("HTTP/1.1 200 OK"));
assertThat(response, containsString(bout.toString()));
}
@Test
public void testReset() throws Exception
{
ByteArrayOutputStream exp = new ByteArrayOutputStream();
_swap.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
HttpOutput out = (HttpOutput)response.getOutputStream();
Interceptor interceptor = out.getInterceptor();
out.setInterceptor(new Interceptor()
{
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
interceptor.write(content, last, callback);
}
@Override
public Interceptor getNextInterceptor()
{
return interceptor;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return interceptor.isOptimizedForDirectBuffers();
}
});
out.setBufferSize(128);
out.println("NOT TO BE SEEN!");
out.resetBuffer();
byte[] data = "TO BE SEEN\n".getBytes(StandardCharsets.ISO_8859_1);
exp.write(data);
out.write(data);
out.flush();
data = "Not reset after flush\n".getBytes(StandardCharsets.ISO_8859_1);
exp.write(data);
try
{
out.resetBuffer();
}
catch (IllegalStateException e)
{
out.write(data);
}
}
});
_swap.getHandler().start();
String response = _connector.getResponse("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response, containsString("HTTP/1.1 200 OK"));
assertThat(response, containsString(exp.toString()));
}
@Test
public void testZeroLengthWrite() throws Exception
{
_swap.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentLength(0);
AsyncContext async = request.startAsync();
response.getOutputStream().setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
{
response.getOutputStream().write(new byte[0]);
async.complete();
}
@Override
public void onError(Throwable t)
{
}
});
}
});
_swap.getHandler().start();
String response = _connector.getResponse("GET / HTTP/1.0\nHost: localhost:80\n\n");
assertThat(response, containsString("HTTP/1.1 200 OK"));
}
private static String toUTF8String(Resource resource)
throws IOException
{
@ -688,8 +1033,6 @@ public class HttpOutputTest
{
}
;
void setNext(Interceptor interceptor);
}