Merge pull request #6194 from eclipse/jetty-9.4.x-FileBufferedResponseHandler

Create FileBufferedResponseHandler to buffer responses into a file.
This commit is contained in:
Lachlan 2021-04-21 11:05:55 +10:00 committed by GitHub
commit a4124b43a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1070 additions and 170 deletions

View File

@ -325,8 +325,9 @@ public class HttpStatus
switch (status)
{
case NO_CONTENT_204:
case NOT_MODIFIED_304:
case RESET_CONTENT_205:
case PARTIAL_CONTENT_206:
case NOT_MODIFIED_304:
return true;
default:

View File

@ -47,6 +47,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class);
static final InetAddress NOIP;
static final InetSocketAddress NOIPPORT;
private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 1024;
static
{
@ -80,6 +81,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
private final Locker _locker = new Locker();
private final Condition _hasOutput = _locker.newCondition();
private final Queue<ByteBuffer> _inQ = new ArrayDeque<>();
private final int _outputSize;
private ByteBuffer _out;
private boolean _growOutput;
@ -129,7 +131,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
super(timer);
if (BufferUtil.hasContent(input))
addInput(input);
_out = output == null ? BufferUtil.allocate(1024) : output;
_outputSize = (output == null) ? 1024 : output.capacity();
_out = output == null ? BufferUtil.allocate(_outputSize) : output;
setIdleTimeout(idleTimeoutMs);
onOpen();
}
@ -296,7 +299,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
try (Locker.Lock lock = _locker.lock())
{
b = _out;
_out = BufferUtil.allocate(b.capacity());
_out = BufferUtil.allocate(_outputSize);
}
getWriteFlusher().completeWrite();
return b;
@ -322,7 +325,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
return null;
}
b = _out;
_out = BufferUtil.allocate(b.capacity());
_out = BufferUtil.allocate(_outputSize);
}
getWriteFlusher().completeWrite();
return b;
@ -436,9 +439,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint
BufferUtil.compact(_out);
if (b.remaining() > BufferUtil.space(_out))
{
ByteBuffer n = BufferUtil.allocate(_out.capacity() + b.remaining() * 2);
BufferUtil.append(n, _out);
_out = n;
// Don't grow larger than MAX_BUFFER_SIZE to avoid memory issues.
if (_out.capacity() < MAX_BUFFER_SIZE)
{
long newBufferCapacity = Math.min((long)(_out.capacity() + b.remaining() * 1.5), MAX_BUFFER_SIZE);
ByteBuffer n = BufferUtil.allocate(Math.toIntExact(newBufferCapacity));
BufferUtil.append(n, _out);
_out = n;
}
}
}

View File

@ -642,6 +642,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
catch (Throwable t)
{
onWriteComplete(true, t);
throw t;
}
}
}

View File

@ -20,14 +20,15 @@ package org.eclipse.jetty.server.handler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.pathmap.PathSpecSet;
import org.eclipse.jetty.server.HttpChannel;
@ -45,16 +46,17 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* Buffered Response Handler
* <p>
* A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor}
* mechanism to buffer the entire response content until the output is closed.
* This allows the commit to be delayed until the response is complete and thus
* headers and response status can be changed while writing the body.
* </p>
* <p>
* Note that the decision to buffer is influenced by the headers and status at the
* first write, and thus subsequent changes to those headers will not influence the
* decision to buffer or not.
* </p>
* <p>
* Note also that there are no memory limits to the size of the buffer, thus
* this handler can represent an unbounded memory commitment if the content
@ -63,7 +65,7 @@ import org.eclipse.jetty.util.log.Logger;
*/
public class BufferedResponseHandler extends HandlerWrapper
{
static final Logger LOG = Log.getLogger(BufferedResponseHandler.class);
private static final Logger LOG = Log.getLogger(BufferedResponseHandler.class);
private final IncludeExclude<String> _methods = new IncludeExclude<>();
private final IncludeExclude<String> _paths = new IncludeExclude<>(PathSpecSet.class);
@ -71,10 +73,7 @@ public class BufferedResponseHandler extends HandlerWrapper
public BufferedResponseHandler()
{
// include only GET requests
_methods.include(HttpMethod.GET.asString());
// Exclude images, aduio and video from buffering
for (String type : MimeTypes.getKnownMimeTypes())
{
if (type.startsWith("image/") ||
@ -82,7 +81,9 @@ public class BufferedResponseHandler extends HandlerWrapper
type.startsWith("video/"))
_mimeTypes.exclude(type);
}
LOG.debug("{} mime types {}", this, _mimeTypes);
if (LOG.isDebugEnabled())
LOG.debug("{} mime types {}", this, _mimeTypes);
}
public IncludeExclude<String> getMethodIncludeExclude()
@ -100,69 +101,6 @@ public class BufferedResponseHandler extends HandlerWrapper
return _mimeTypes;
}
/**
* @see org.eclipse.jetty.server.handler.HandlerWrapper#handle(java.lang.String, org.eclipse.jetty.server.Request, javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
*/
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
ServletContext context = baseRequest.getServletContext();
String path = context == null ? baseRequest.getRequestURI() : URIUtil.addPaths(baseRequest.getServletPath(), baseRequest.getPathInfo());
LOG.debug("{} handle {} in {}", this, baseRequest, context);
HttpOutput out = baseRequest.getResponse().getHttpOutput();
// Are we already being gzipped?
HttpOutput.Interceptor interceptor = out.getInterceptor();
while (interceptor != null)
{
if (interceptor instanceof BufferedInterceptor)
{
LOG.debug("{} already intercepting {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
interceptor = interceptor.getNextInterceptor();
}
// If not a supported method - no Vary because no matter what client, this URI is always excluded
if (!_methods.test(baseRequest.getMethod()))
{
LOG.debug("{} excluded by method {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
// If not a supported URI- no Vary because no matter what client, this URI is always excluded
// Use pathInfo because this is be
if (!isPathBufferable(path))
{
LOG.debug("{} excluded by path {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
// If the mime type is known from the path, then apply mime type filtering
String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path);
if (mimeType != null)
{
mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType);
if (!isMimeTypeBufferable(mimeType))
{
LOG.debug("{} excluded by path suffix mime type {}", this, request);
// handle normally without setting vary header
_handler.handle(target, baseRequest, request, response);
return;
}
}
// install interceptor and handle
out.setInterceptor(new BufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor()));
if (_handler != null)
_handler.handle(target, baseRequest, request, response);
}
protected boolean isMimeTypeBufferable(String mimetype)
{
return _mimeTypes.test(mimetype);
@ -176,93 +114,115 @@ public class BufferedResponseHandler extends HandlerWrapper
return _paths.test(requestURI);
}
private class BufferedInterceptor implements HttpOutput.Interceptor
protected boolean shouldBuffer(HttpChannel channel, boolean last)
{
final Interceptor _next;
final HttpChannel _channel;
final Queue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
Boolean _aggregating;
ByteBuffer _aggregate;
if (last)
return false;
public BufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
Response response = channel.getResponse();
int status = response.getStatus();
if (HttpStatus.hasNoBody(status) || HttpStatus.isRedirection(status))
return false;
String ct = response.getContentType();
if (ct == null)
return true;
ct = MimeTypes.getContentTypeWithoutCharset(ct);
return isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct));
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
ServletContext context = baseRequest.getServletContext();
String path = context == null ? baseRequest.getRequestURI() : URIUtil.addPaths(baseRequest.getServletPath(), baseRequest.getPathInfo());
if (LOG.isDebugEnabled())
LOG.debug("{} handle {} in {}", this, baseRequest, context);
// Are we already buffering?
HttpOutput out = baseRequest.getResponse().getHttpOutput();
HttpOutput.Interceptor interceptor = out.getInterceptor();
while (interceptor != null)
{
if (interceptor instanceof BufferedInterceptor)
{
if (LOG.isDebugEnabled())
LOG.debug("{} already intercepting {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
interceptor = interceptor.getNextInterceptor();
}
// If not a supported method this URI is always excluded.
if (!_methods.test(baseRequest.getMethod()))
{
if (LOG.isDebugEnabled())
LOG.debug("{} excluded by method {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
// If not a supported path this URI is always excluded.
if (!isPathBufferable(path))
{
if (LOG.isDebugEnabled())
LOG.debug("{} excluded by path {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
// If the mime type is known from the path then apply mime type filtering.
String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path);
if (mimeType != null)
{
mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType);
if (!isMimeTypeBufferable(mimeType))
{
if (LOG.isDebugEnabled())
LOG.debug("{} excluded by path suffix mime type {}", this, request);
// handle normally without setting vary header
_handler.handle(target, baseRequest, request, response);
return;
}
}
// Install buffered interceptor and handle.
out.setInterceptor(newBufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor()));
if (_handler != null)
_handler.handle(target, baseRequest, request, response);
}
protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
return new ArrayBufferedInterceptor(httpChannel, interceptor);
}
/**
* An {@link HttpOutput.Interceptor} which is created by {@link #newBufferedInterceptor(HttpChannel, Interceptor)}
* and is used by the implementation to buffer outgoing content.
*/
protected interface BufferedInterceptor extends HttpOutput.Interceptor
{
}
private class ArrayBufferedInterceptor implements BufferedInterceptor
{
private final Interceptor _next;
private final HttpChannel _channel;
private final Queue<ByteBuffer> _buffers = new ArrayDeque<>();
private Boolean _aggregating;
private ByteBuffer _aggregate;
public ArrayBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
_next = interceptor;
_channel = httpChannel;
}
@Override
public void resetBuffer()
{
_buffers.clear();
_aggregating = null;
_aggregate = null;
}
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
// if we are not committed, have to decide if we should aggregate or not
if (_aggregating == null)
{
Response response = _channel.getResponse();
int sc = response.getStatus();
if (sc > 0 && (sc < 200 || sc == 204 || sc == 205 || sc >= 300))
_aggregating = Boolean.FALSE; // No body
else
{
String ct = response.getContentType();
if (ct == null)
_aggregating = Boolean.TRUE;
else
{
ct = MimeTypes.getContentTypeWithoutCharset(ct);
_aggregating = isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct));
}
}
}
// If we are not aggregating, then handle normally
if (!_aggregating.booleanValue())
{
getNextInterceptor().write(content, last, callback);
return;
}
// If last
if (last)
{
// Add the current content to the buffer list without a copy
if (BufferUtil.length(content) > 0)
_buffers.add(content);
if (LOG.isDebugEnabled())
LOG.debug("{} committing {}", this, _buffers.size());
commit(_buffers, callback);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} aggregating", this);
// Aggregate the content into buffer chain
while (BufferUtil.hasContent(content))
{
// Do we need a new aggregate buffer
if (BufferUtil.space(_aggregate) == 0)
{
int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content));
_aggregate = BufferUtil.allocate(size); // TODO use a buffer pool
_buffers.add(_aggregate);
}
BufferUtil.append(_aggregate, content);
}
callback.succeeded();
}
}
@Override
public Interceptor getNextInterceptor()
{
@ -275,21 +235,82 @@ public class BufferedResponseHandler extends HandlerWrapper
return false;
}
protected void commit(Queue<ByteBuffer> buffers, Callback callback)
@Override
public void resetBuffer()
{
// If only 1 buffer
if (_buffers.size() == 0)
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
else if (_buffers.size() == 1)
// just flush it with the last callback
getNextInterceptor().write(_buffers.remove(), true, callback);
_buffers.clear();
_aggregating = null;
_aggregate = null;
BufferedInterceptor.super.resetBuffer();
}
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
// If we are not committed, have to decide if we should aggregate or not.
if (_aggregating == null)
_aggregating = shouldBuffer(_channel, last);
// If we are not aggregating, then handle normally.
if (!_aggregating)
{
getNextInterceptor().write(content, last, callback);
return;
}
if (last)
{
// Add the current content to the buffer list without a copy.
if (BufferUtil.length(content) > 0)
_buffers.offer(content);
if (LOG.isDebugEnabled())
LOG.debug("{} committing {}", this, _buffers.size());
commit(callback);
}
else
{
// Create an iterating callback to do the writing
if (LOG.isDebugEnabled())
LOG.debug("{} aggregating", this);
// Aggregate the content into buffer chain.
while (BufferUtil.hasContent(content))
{
// Do we need a new aggregate buffer.
if (BufferUtil.space(_aggregate) == 0)
{
// TODO: use a buffer pool always allocating with outputBufferSize to avoid polluting the ByteBufferPool.
int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content));
_aggregate = BufferUtil.allocate(size);
_buffers.offer(_aggregate);
}
BufferUtil.append(_aggregate, content);
}
callback.succeeded();
}
}
private void commit(Callback callback)
{
if (_buffers.size() == 0)
{
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
}
else if (_buffers.size() == 1)
{
getNextInterceptor().write(_buffers.poll(), true, callback);
}
else
{
// Create an iterating callback to do the writing.
IteratingCallback icb = new IteratingCallback()
{
@Override
protected Action process() throws Exception
protected Action process()
{
ByteBuffer buffer = _buffers.poll();
if (buffer == null)
@ -302,14 +323,14 @@ public class BufferedResponseHandler extends HandlerWrapper
@Override
protected void onCompleteSuccess()
{
// Signal last callback
// Signal last callback.
callback.succeeded();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
// Signal last callback
// Signal last callback.
callback.failed(cause);
}
};

View File

@ -0,0 +1,243 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpOutput.Interceptor;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>
* A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor}
* mechanism to buffer the entire response content until the output is closed.
* This allows the commit to be delayed until the response is complete and thus
* headers and response status can be changed while writing the body.
* </p>
* <p>
* Note that the decision to buffer is influenced by the headers and status at the
* first write, and thus subsequent changes to those headers will not influence the
* decision to buffer or not.
* </p>
* <p>
* Note also that there are no memory limits to the size of the buffer, thus
* this handler can represent an unbounded memory commitment if the content
* generated can also be unbounded.
* </p>
*/
public class FileBufferedResponseHandler extends BufferedResponseHandler
{
private static final Logger LOG = Log.getLogger(FileBufferedResponseHandler.class);
private Path _tempDir = new File(System.getProperty("java.io.tmpdir")).toPath();
public Path getTempDir()
{
return _tempDir;
}
public void setTempDir(Path tempDir)
{
_tempDir = Objects.requireNonNull(tempDir);
}
@Override
protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
return new FileBufferedInterceptor(httpChannel, interceptor);
}
private class FileBufferedInterceptor implements BufferedResponseHandler.BufferedInterceptor
{
private static final int MAX_MAPPED_BUFFER_SIZE = Integer.MAX_VALUE / 2;
private final Interceptor _next;
private final HttpChannel _channel;
private Boolean _aggregating;
private Path _filePath;
private OutputStream _fileOutputStream;
public FileBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
_next = interceptor;
_channel = httpChannel;
}
@Override
public Interceptor getNextInterceptor()
{
return _next;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
@Override
public void resetBuffer()
{
dispose();
BufferedInterceptor.super.resetBuffer();
}
private void dispose()
{
IO.close(_fileOutputStream);
_fileOutputStream = null;
_aggregating = null;
if (_filePath != null)
{
try
{
Files.delete(_filePath);
}
catch (Throwable t)
{
LOG.warn("Could not delete file {}", _filePath, t);
}
_filePath = null;
}
}
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
// If we are not committed, must decide if we should aggregate or not.
if (_aggregating == null)
_aggregating = shouldBuffer(_channel, last);
// If we are not aggregating, then handle normally.
if (!_aggregating)
{
getNextInterceptor().write(content, last, callback);
return;
}
if (LOG.isDebugEnabled())
LOG.debug("{} aggregating", this);
try
{
if (BufferUtil.hasContent(content))
aggregate(content);
}
catch (Throwable t)
{
dispose();
callback.failed(t);
return;
}
if (last)
commit(callback);
else
callback.succeeded();
}
private void aggregate(ByteBuffer content) throws IOException
{
if (_fileOutputStream == null)
{
// Create a new OutputStream to a file.
_filePath = Files.createTempFile(_tempDir, "BufferedResponse", "");
_fileOutputStream = Files.newOutputStream(_filePath, StandardOpenOption.WRITE);
}
BufferUtil.writeTo(content, _fileOutputStream);
}
private void commit(Callback callback)
{
if (_fileOutputStream == null)
{
// We have no content to write, signal next interceptor that we are finished.
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
return;
}
try
{
_fileOutputStream.close();
_fileOutputStream = null;
}
catch (Throwable t)
{
dispose();
callback.failed(t);
return;
}
// Create an iterating callback to do the writing
IteratingCallback icb = new IteratingCallback()
{
private final long fileLength = _filePath.toFile().length();
private long _pos = 0;
private boolean _last = false;
@Override
protected Action process() throws Exception
{
if (_last)
return Action.SUCCEEDED;
long len = Math.min(MAX_MAPPED_BUFFER_SIZE, fileLength - _pos);
_last = (_pos + len == fileLength);
ByteBuffer buffer = BufferUtil.toMappedBuffer(_filePath, _pos, len);
getNextInterceptor().write(buffer, _last, this);
_pos += len;
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess()
{
dispose();
callback.succeeded();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
dispose();
callback.failed(cause);
}
};
icb.iterate();
}
}
}

View File

@ -0,0 +1,620 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.FileBufferedResponseHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.toolchain.test.FS;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FileBufferedResponseHandlerTest
{
private static final Logger LOG = Log.getLogger(FileBufferedResponseHandlerTest.class);
private Server _server;
private LocalConnector _localConnector;
private ServerConnector _serverConnector;
private Path _testDir;
private FileBufferedResponseHandler _bufferedHandler;
@BeforeEach
public void before() throws Exception
{
_testDir = MavenTestingUtils.getTargetTestingPath(FileBufferedResponseHandlerTest.class.getName());
FS.ensureDirExists(_testDir);
_server = new Server();
HttpConfiguration config = new HttpConfiguration();
config.setOutputBufferSize(1024);
config.setOutputAggregationSize(256);
_localConnector = new LocalConnector(_server, new HttpConnectionFactory(config));
_localConnector.setIdleTimeout(Duration.ofMinutes(1).toMillis());
_server.addConnector(_localConnector);
_serverConnector = new ServerConnector(_server, new HttpConnectionFactory(config));
_server.addConnector(_serverConnector);
_bufferedHandler = new FileBufferedResponseHandler();
_bufferedHandler.setTempDir(_testDir);
_bufferedHandler.getPathIncludeExclude().include("/include/*");
_bufferedHandler.getPathIncludeExclude().exclude("*.exclude");
_bufferedHandler.getMimeIncludeExclude().exclude("text/excluded");
_server.setHandler(_bufferedHandler);
FS.ensureEmpty(_testDir);
}
@AfterEach
public void after() throws Exception
{
_server.stop();
}
@Test
public void testPathNotIncluded() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was committed after the first write and we never created a file to buffer the response into.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("Committed: true"));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testIncludedByPath() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was not committed after the first write and a file was created to buffer the response.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("Committed: false"));
assertThat(responseContent, containsString("NumFiles: 1"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testExcludedByPath() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path.exclude HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was committed after the first write and we never created a file to buffer the response into.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("Committed: true"));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testExcludedByMime() throws Exception
{
String excludedMimeType = "text/excluded";
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setContentType(excludedMimeType);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was committed after the first write and we never created a file to buffer the response into.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("Committed: true"));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testFlushed() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(1024);
PrintWriter writer = response.getWriter();
writer.println("a string smaller than the buffer size");
writer.println("NumFilesBeforeFlush: " + getNumFiles());
writer.flush();
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was not committed after the buffer was flushed and a file was created to buffer the response.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("NumFilesBeforeFlush: 0"));
assertThat(responseContent, containsString("Committed: false"));
assertThat(responseContent, containsString("NumFiles: 1"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testClosed() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("NumFiles: " + getNumFiles());
writer.close();
writer.println("writtenAfterClose");
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The content written after close was not sent.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, not(containsString("writtenAfterClose")));
assertThat(responseContent, containsString("NumFiles: 1"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testBufferSizeBig() throws Exception
{
int bufferSize = 4096;
String largeContent = generateContent(bufferSize - 64);
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(bufferSize);
PrintWriter writer = response.getWriter();
writer.println(largeContent);
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The content written was not buffered as a file as it was less than the buffer size.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, not(containsString("writtenAfterClose")));
assertThat(responseContent, containsString("Committed: false"));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testFlushEmpty() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(1024);
PrintWriter writer = response.getWriter();
writer.flush();
int numFiles = getNumFiles();
writer.println("NumFiles: " + numFiles);
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The flush should not create the file unless there is content to write.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testReset() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(8);
PrintWriter writer = response.getWriter();
writer.println("THIS WILL BE RESET");
writer.flush();
writer.println("THIS WILL BE RESET");
int numFilesBeforeReset = getNumFiles();
response.resetBuffer();
int numFilesAfterReset = getNumFiles();
writer.println("NumFilesBeforeReset: " + numFilesBeforeReset);
writer.println("NumFilesAfterReset: " + numFilesAfterReset);
writer.println("a string larger than the buffer size");
writer.println("NumFilesAfterWrite: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// Resetting the response buffer will delete the file.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, not(containsString("THIS WILL BE RESET")));
assertThat(responseContent, containsString("NumFilesBeforeReset: 1"));
assertThat(responseContent, containsString("NumFilesAfterReset: 0"));
assertThat(responseContent, containsString("NumFilesAfterWrite: 1"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testFileLargerThanMaxInteger() throws Exception
{
long fileSize = Integer.MAX_VALUE + 1234L;
byte[] bytes = randomBytes(1024 * 1024);
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
ServletOutputStream outputStream = response.getOutputStream();
long written = 0;
while (written < fileSize)
{
int length = Math.toIntExact(Math.min(bytes.length, fileSize - written));
outputStream.write(bytes, 0, length);
written += length;
}
outputStream.flush();
response.setHeader("NumFiles", Integer.toString(getNumFiles()));
response.setHeader("FileSize", Long.toString(getFileSize()));
}
});
_server.start();
AtomicLong received = new AtomicLong();
HttpTester.Response response = new HttpTester.Response()
{
@Override
public boolean content(ByteBuffer ref)
{
// Verify the content is what was sent.
while (ref.hasRemaining())
{
byte byteFromBuffer = ref.get();
long totalReceived = received.getAndIncrement();
int bytesIndex = (int)(totalReceived % bytes.length);
byte byteFromArray = bytes[bytesIndex];
if (byteFromBuffer != byteFromArray)
{
LOG.warn("Mismatch at index {} received bytes {}, {}!={}", bytesIndex, totalReceived, byteFromBuffer, byteFromArray, new IllegalStateException());
return true;
}
}
return false;
}
};
try (Socket socket = new Socket("localhost", _serverConnector.getLocalPort()))
{
OutputStream output = socket.getOutputStream();
String request = "GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
HttpTester.Input input = HttpTester.from(socket.getInputStream());
HttpTester.parseResponse(input, response);
}
assertTrue(response.isComplete());
assertThat(response.get("NumFiles"), is("1"));
assertThat(response.get("FileSize"), is(Long.toString(fileSize)));
assertThat(received.get(), is(fileSize));
assertThat(getNumFiles(), is(0));
}
@Test
public void testNextInterceptorFailed() throws Exception
{
AbstractHandler failingInterceptorHandler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
HttpOutput httpOutput = baseRequest.getResponse().getHttpOutput();
HttpOutput.Interceptor nextInterceptor = httpOutput.getInterceptor();
httpOutput.setInterceptor(new HttpOutput.Interceptor()
{
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
callback.failed(new Throwable("intentionally throwing from interceptor"));
}
@Override
public HttpOutput.Interceptor getNextInterceptor()
{
return nextInterceptor;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
});
}
};
_server.setHandler(new HandlerCollection(failingInterceptorHandler, _server.getHandler()));
CompletableFuture<Throwable> errorFuture = new CompletableFuture<>();
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
byte[] chunk1 = "this content will ".getBytes();
byte[] chunk2 = "be buffered in a file".getBytes();
response.setContentLength(chunk1.length + chunk2.length);
ServletOutputStream outputStream = response.getOutputStream();
// Write chunk1 and then flush so it is written to the file.
outputStream.write(chunk1);
outputStream.flush();
assertThat(getNumFiles(), is(1));
try
{
// ContentLength is set so it knows this is the last write.
// This will cause the file to be written to the next interceptor which will fail.
outputStream.write(chunk2);
}
catch (Throwable t)
{
errorFuture.complete(t);
throw t;
}
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
// Response was aborted.
assertThat(response.getStatus(), is(0));
// We failed because of the next interceptor.
Throwable error = errorFuture.get(5, TimeUnit.SECONDS);
assertThat(error.getMessage(), containsString("intentionally throwing from interceptor"));
// All files were deleted.
assertThat(getNumFiles(), is(0));
}
@Test
public void testFileWriteFailed() throws Exception
{
// Set the temp directory to an empty directory so that the file cannot be created.
File tempDir = MavenTestingUtils.getTargetTestingDir(getClass().getSimpleName());
FS.ensureDeleted(tempDir);
_bufferedHandler.setTempDir(tempDir.toPath());
CompletableFuture<Throwable> errorFuture = new CompletableFuture<>();
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
ServletOutputStream outputStream = response.getOutputStream();
byte[] content = "this content will be buffered in a file".getBytes();
try
{
// Write the content and flush it to the file.
// This should throw as it cannot create the file to aggregate into.
outputStream.write(content);
outputStream.flush();
}
catch (Throwable t)
{
errorFuture.complete(t);
throw t;
}
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
// Response was aborted.
assertThat(response.getStatus(), is(0));
// We failed because cannot create the file.
Throwable error = errorFuture.get(5, TimeUnit.SECONDS);
assertThat(error, instanceOf(NoSuchFileException.class));
// No files were created.
assertThat(getNumFiles(), is(0));
}
private int getNumFiles()
{
File[] files = _testDir.toFile().listFiles();
if (files == null)
return 0;
return files.length;
}
private long getFileSize()
{
File[] files = _testDir.toFile().listFiles();
assertNotNull(files);
assertThat(files.length, is(1));
return files[0].length();
}
private static String generateContent(int size)
{
Random random = new Random();
StringBuilder stringBuilder = new StringBuilder(size);
for (int i = 0; i < size; i++)
{
stringBuilder.append((char)Math.abs(random.nextInt(0x7F)));
}
return stringBuilder.toString();
}
@SuppressWarnings("SameParameterValue")
private byte[] randomBytes(int size)
{
byte[] data = new byte[size];
new Random().nextBytes(data);
return data;
}
}

View File

@ -30,6 +30,7 @@ import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
@ -985,9 +986,14 @@ public class BufferUtil
public static ByteBuffer toMappedBuffer(File file) throws IOException
{
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ))
return toMappedBuffer(file.toPath(), 0, file.length());
}
public static ByteBuffer toMappedBuffer(Path filePath, long pos, long len) throws IOException
{
try (FileChannel channel = FileChannel.open(filePath, StandardOpenOption.READ))
{
return channel.map(MapMode.READ_ONLY, 0, file.length());
return channel.map(MapMode.READ_ONLY, pos, len);
}
}