Create FileBufferedResponseHandler to buffer responses into a file. (#6010)

FileBufferedResponseHandler adds an HttpOutput.Interceptor to buffer all responses into a file until the output is closed. This allows the commit to be delayed until the response is complete and thus headers and response status can be changed while writing the body.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan 2021-04-19 11:02:44 +10:00 committed by GitHub
parent b872c1d751
commit 4c98990cd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1048 additions and 170 deletions

View File

@ -314,8 +314,9 @@ public class HttpStatus
switch (status)
{
case NO_CONTENT_204:
case NOT_MODIFIED_304:
case RESET_CONTENT_205:
case PARTIAL_CONTENT_206:
case NOT_MODIFIED_304:
return true;
default:

View File

@ -42,6 +42,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
static final Logger LOG = LoggerFactory.getLogger(ByteArrayEndPoint.class);
static final InetAddress NOIP;
static final InetSocketAddress NOIPPORT;
private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 1024;
static
{
@ -67,6 +68,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
private final AutoLock _lock = new AutoLock();
private final Condition _hasOutput = _lock.newCondition();
private final Queue<ByteBuffer> _inQ = new ArrayDeque<>();
private final int _outputSize;
private ByteBuffer _out;
private boolean _growOutput;
@ -113,7 +115,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
super(timer);
if (BufferUtil.hasContent(input))
addInput(input);
_out = output == null ? BufferUtil.allocate(1024) : output;
_outputSize = (output == null) ? 1024 : output.capacity();
_out = output == null ? BufferUtil.allocate(_outputSize) : output;
setIdleTimeout(idleTimeoutMs);
onOpen();
}
@ -290,7 +293,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
try (AutoLock lock = _lock.lock())
{
b = _out;
_out = BufferUtil.allocate(b.capacity());
_out = BufferUtil.allocate(_outputSize);
}
getWriteFlusher().completeWrite();
return b;
@ -316,7 +319,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
return null;
}
b = _out;
_out = BufferUtil.allocate(b.capacity());
_out = BufferUtil.allocate(_outputSize);
}
getWriteFlusher().completeWrite();
return b;
@ -424,11 +427,16 @@ public class ByteArrayEndPoint extends AbstractEndPoint
BufferUtil.compact(_out);
if (b.remaining() > BufferUtil.space(_out))
{
ByteBuffer n = BufferUtil.allocate(_out.capacity() + b.remaining() * 2);
// Don't grow larger than MAX_BUFFER_SIZE to avoid memory issues.
if (_out.capacity() < MAX_BUFFER_SIZE)
{
long newBufferCapacity = Math.min((long)(_out.capacity() + b.remaining() * 1.5), MAX_BUFFER_SIZE);
ByteBuffer n = BufferUtil.allocate(Math.toIntExact(newBufferCapacity));
BufferUtil.append(n, _out);
_out = n;
}
}
}
if (BufferUtil.append(_out, b) > 0)
idle = false;

View File

@ -23,7 +23,6 @@ import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.ResourceBundle;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import javax.servlet.RequestDispatcher;
@ -627,6 +626,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
catch (Throwable t)
{
onWriteComplete(true, t);
throw t;
}
}
}

View File

@ -15,14 +15,15 @@ package org.eclipse.jetty.server.handler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.pathmap.PathSpecSet;
import org.eclipse.jetty.server.HttpChannel;
@ -39,16 +40,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Buffered Response Handler
* <p>
* A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor}
* mechanism to buffer the entire response content until the output is closed.
* This allows the commit to be delayed until the response is complete and thus
* headers and response status can be changed while writing the body.
* </p>
* <p>
* Note that the decision to buffer is influenced by the headers and status at the
* first write, and thus subsequent changes to those headers will not influence the
* decision to buffer or not.
* </p>
* <p>
* Note also that there are no memory limits to the size of the buffer, thus
* this handler can represent an unbounded memory commitment if the content
@ -57,7 +59,7 @@ import org.slf4j.LoggerFactory;
*/
public class BufferedResponseHandler extends HandlerWrapper
{
static final Logger LOG = LoggerFactory.getLogger(BufferedResponseHandler.class);
private static final Logger LOG = LoggerFactory.getLogger(BufferedResponseHandler.class);
private final IncludeExclude<String> _methods = new IncludeExclude<>();
private final IncludeExclude<String> _paths = new IncludeExclude<>(PathSpecSet.class);
@ -65,10 +67,7 @@ public class BufferedResponseHandler extends HandlerWrapper
public BufferedResponseHandler()
{
// include only GET requests
_methods.include(HttpMethod.GET.asString());
// Exclude images, aduio and video from buffering
for (String type : MimeTypes.getKnownMimeTypes())
{
if (type.startsWith("image/") ||
@ -76,6 +75,8 @@ public class BufferedResponseHandler extends HandlerWrapper
type.startsWith("video/"))
_mimeTypes.exclude(type);
}
if (LOG.isDebugEnabled())
LOG.debug("{} mime types {}", this, _mimeTypes);
}
@ -94,66 +95,6 @@ public class BufferedResponseHandler extends HandlerWrapper
return _mimeTypes;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
final ServletContext context = baseRequest.getServletContext();
final String path = baseRequest.getPathInContext();
LOG.debug("{} handle {} in {}", this, baseRequest, context);
HttpOutput out = baseRequest.getResponse().getHttpOutput();
// Are we already being gzipped?
HttpOutput.Interceptor interceptor = out.getInterceptor();
while (interceptor != null)
{
if (interceptor instanceof BufferedInterceptor)
{
LOG.debug("{} already intercepting {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
interceptor = interceptor.getNextInterceptor();
}
// If not a supported method - no Vary because no matter what client, this URI is always excluded
if (!_methods.test(baseRequest.getMethod()))
{
LOG.debug("{} excluded by method {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
// If not a supported URI- no Vary because no matter what client, this URI is always excluded
// Use pathInfo because this is be
if (!isPathBufferable(path))
{
LOG.debug("{} excluded by path {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
// If the mime type is known from the path, then apply mime type filtering
String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path);
if (mimeType != null)
{
mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType);
if (!isMimeTypeBufferable(mimeType))
{
LOG.debug("{} excluded by path suffix mime type {}", this, request);
// handle normally without setting vary header
_handler.handle(target, baseRequest, request, response);
return;
}
}
// install interceptor and handle
out.setInterceptor(new BufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor()));
if (_handler != null)
_handler.handle(target, baseRequest, request, response);
}
protected boolean isMimeTypeBufferable(String mimetype)
{
return _mimeTypes.test(mimetype);
@ -167,93 +108,113 @@ public class BufferedResponseHandler extends HandlerWrapper
return _paths.test(requestURI);
}
private class BufferedInterceptor implements HttpOutput.Interceptor
protected boolean shouldBuffer(HttpChannel channel, boolean last)
{
final Interceptor _next;
final HttpChannel _channel;
final Queue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
Boolean _aggregating;
ByteBuffer _aggregate;
if (last)
return false;
public BufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
_next = interceptor;
_channel = httpChannel;
}
Response response = channel.getResponse();
int status = response.getStatus();
if (HttpStatus.hasNoBody(status) || HttpStatus.isRedirection(status))
return false;
@Override
public void resetBuffer()
{
_buffers.clear();
_aggregating = null;
_aggregate = null;
}
;
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
// if we are not committed, have to decide if we should aggregate or not
if (_aggregating == null)
{
Response response = _channel.getResponse();
int sc = response.getStatus();
if (sc > 0 && (sc < 200 || sc == 204 || sc == 205 || sc >= 300))
_aggregating = Boolean.FALSE; // No body
else
{
String ct = response.getContentType();
if (ct == null)
_aggregating = Boolean.TRUE;
else
{
return true;
ct = MimeTypes.getContentTypeWithoutCharset(ct);
_aggregating = isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct));
}
}
return isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct));
}
// If we are not aggregating, then handle normally
if (!_aggregating.booleanValue())
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
getNextInterceptor().write(content, last, callback);
final ServletContext context = baseRequest.getServletContext();
final String path = baseRequest.getPathInContext();
if (LOG.isDebugEnabled())
LOG.debug("{} handle {} in {}", this, baseRequest, context);
// Are we already buffering?
HttpOutput out = baseRequest.getResponse().getHttpOutput();
HttpOutput.Interceptor interceptor = out.getInterceptor();
while (interceptor != null)
{
if (interceptor instanceof BufferedInterceptor)
{
if (LOG.isDebugEnabled())
LOG.debug("{} already intercepting {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
interceptor = interceptor.getNextInterceptor();
}
// If not a supported method this URI is always excluded.
if (!_methods.test(baseRequest.getMethod()))
{
if (LOG.isDebugEnabled())
LOG.debug("{} excluded by method {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
// If last
if (last)
{
// Add the current content to the buffer list without a copy
if (BufferUtil.length(content) > 0)
_buffers.add(content);
if (LOG.isDebugEnabled())
LOG.debug("{} committing {}", this, _buffers.size());
commit(_buffers, callback);
}
else
// If not a supported path this URI is always excluded.
if (!isPathBufferable(path))
{
if (LOG.isDebugEnabled())
LOG.debug("{} aggregating", this);
// Aggregate the content into buffer chain
while (BufferUtil.hasContent(content))
{
// Do we need a new aggregate buffer
if (BufferUtil.space(_aggregate) == 0)
{
int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content));
_aggregate = BufferUtil.allocate(size); // TODO use a buffer pool
_buffers.add(_aggregate);
LOG.debug("{} excluded by path {}", this, request);
_handler.handle(target, baseRequest, request, response);
return;
}
BufferUtil.append(_aggregate, content);
// If the mime type is known from the path then apply mime type filtering.
String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path);
if (mimeType != null)
{
mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType);
if (!isMimeTypeBufferable(mimeType))
{
if (LOG.isDebugEnabled())
LOG.debug("{} excluded by path suffix mime type {}", this, request);
// handle normally without setting vary header
_handler.handle(target, baseRequest, request, response);
return;
}
callback.succeeded();
}
// Install buffered interceptor and handle.
out.setInterceptor(newBufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor()));
if (_handler != null)
_handler.handle(target, baseRequest, request, response);
}
protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
return new ArrayBufferedInterceptor(httpChannel, interceptor);
}
/**
* An {@link HttpOutput.Interceptor} which is created by {@link #newBufferedInterceptor(HttpChannel, Interceptor)}
* and is used by the implementation to buffer outgoing content.
*/
protected interface BufferedInterceptor extends HttpOutput.Interceptor
{
}
private class ArrayBufferedInterceptor implements BufferedInterceptor
{
private final Interceptor _next;
private final HttpChannel _channel;
private final Queue<ByteBuffer> _buffers = new ArrayDeque<>();
private Boolean _aggregating;
private ByteBuffer _aggregate;
public ArrayBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
_next = interceptor;
_channel = httpChannel;
}
@Override
@ -262,21 +223,82 @@ public class BufferedResponseHandler extends HandlerWrapper
return _next;
}
protected void commit(Queue<ByteBuffer> buffers, Callback callback)
@Override
public void resetBuffer()
{
// If only 1 buffer
if (_buffers.size() == 0)
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
else if (_buffers.size() == 1)
// just flush it with the last callback
getNextInterceptor().write(_buffers.remove(), true, callback);
_buffers.clear();
_aggregating = null;
_aggregate = null;
BufferedInterceptor.super.resetBuffer();
}
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
// If we are not committed, have to decide if we should aggregate or not.
if (_aggregating == null)
_aggregating = shouldBuffer(_channel, last);
// If we are not aggregating, then handle normally.
if (!_aggregating)
{
getNextInterceptor().write(content, last, callback);
return;
}
if (last)
{
// Add the current content to the buffer list without a copy.
if (BufferUtil.length(content) > 0)
_buffers.offer(content);
if (LOG.isDebugEnabled())
LOG.debug("{} committing {}", this, _buffers.size());
commit(callback);
}
else
{
// Create an iterating callback to do the writing
if (LOG.isDebugEnabled())
LOG.debug("{} aggregating", this);
// Aggregate the content into buffer chain.
while (BufferUtil.hasContent(content))
{
// Do we need a new aggregate buffer.
if (BufferUtil.space(_aggregate) == 0)
{
// TODO: use a buffer pool always allocating with outputBufferSize to avoid polluting the ByteBufferPool.
int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content));
_aggregate = BufferUtil.allocate(size);
_buffers.offer(_aggregate);
}
BufferUtil.append(_aggregate, content);
}
callback.succeeded();
}
}
private void commit(Callback callback)
{
if (_buffers.size() == 0)
{
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
}
else if (_buffers.size() == 1)
{
getNextInterceptor().write(_buffers.poll(), true, callback);
}
else
{
// Create an iterating callback to do the writing.
IteratingCallback icb = new IteratingCallback()
{
@Override
protected Action process() throws Exception
protected Action process()
{
ByteBuffer buffer = _buffers.poll();
if (buffer == null)
@ -289,14 +311,14 @@ public class BufferedResponseHandler extends HandlerWrapper
@Override
protected void onCompleteSuccess()
{
// Signal last callback
// Signal last callback.
callback.succeeded();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
// Signal last callback
// Signal last callback.
callback.failed(cause);
}
};

View File

@ -0,0 +1,232 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server.handler;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpOutput.Interceptor;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.IteratingCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor}
* mechanism to buffer the entire response content until the output is closed.
* This allows the commit to be delayed until the response is complete and thus
* headers and response status can be changed while writing the body.
* </p>
* <p>
* Note that the decision to buffer is influenced by the headers and status at the
* first write, and thus subsequent changes to those headers will not influence the
* decision to buffer or not.
* </p>
* <p>
* Note also that there are no memory limits to the size of the buffer, thus
* this handler can represent an unbounded memory commitment if the content
* generated can also be unbounded.
* </p>
*/
public class FileBufferedResponseHandler extends BufferedResponseHandler
{
private static final Logger LOG = LoggerFactory.getLogger(FileBufferedResponseHandler.class);
private Path _tempDir = new File(System.getProperty("java.io.tmpdir")).toPath();
public Path getTempDir()
{
return _tempDir;
}
public void setTempDir(Path tempDir)
{
_tempDir = Objects.requireNonNull(tempDir);
}
@Override
protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
return new FileBufferedInterceptor(httpChannel, interceptor);
}
private class FileBufferedInterceptor implements BufferedResponseHandler.BufferedInterceptor
{
private static final int MAX_MAPPED_BUFFER_SIZE = Integer.MAX_VALUE / 2;
private final Interceptor _next;
private final HttpChannel _channel;
private Boolean _aggregating;
private Path _filePath;
private OutputStream _fileOutputStream;
public FileBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
{
_next = interceptor;
_channel = httpChannel;
}
@Override
public Interceptor getNextInterceptor()
{
return _next;
}
@Override
public void resetBuffer()
{
dispose();
BufferedInterceptor.super.resetBuffer();
}
private void dispose()
{
IO.close(_fileOutputStream);
_fileOutputStream = null;
_aggregating = null;
if (_filePath != null)
{
try
{
Files.delete(_filePath);
}
catch (Throwable t)
{
LOG.warn("Could not delete file {}", _filePath, t);
}
_filePath = null;
}
}
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
// If we are not committed, must decide if we should aggregate or not.
if (_aggregating == null)
_aggregating = shouldBuffer(_channel, last);
// If we are not aggregating, then handle normally.
if (!_aggregating)
{
getNextInterceptor().write(content, last, callback);
return;
}
if (LOG.isDebugEnabled())
LOG.debug("{} aggregating", this);
try
{
if (BufferUtil.hasContent(content))
aggregate(content);
}
catch (Throwable t)
{
dispose();
callback.failed(t);
return;
}
if (last)
commit(callback);
else
callback.succeeded();
}
private void aggregate(ByteBuffer content) throws IOException
{
if (_fileOutputStream == null)
{
// Create a new OutputStream to a file.
_filePath = Files.createTempFile(_tempDir, "BufferedResponse", "");
_fileOutputStream = Files.newOutputStream(_filePath, StandardOpenOption.WRITE);
}
BufferUtil.writeTo(content, _fileOutputStream);
}
private void commit(Callback callback)
{
if (_fileOutputStream == null)
{
// We have no content to write, signal next interceptor that we are finished.
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
return;
}
try
{
_fileOutputStream.close();
_fileOutputStream = null;
}
catch (Throwable t)
{
dispose();
callback.failed(t);
return;
}
// Create an iterating callback to do the writing
IteratingCallback icb = new IteratingCallback()
{
private final long fileLength = _filePath.toFile().length();
private long _pos = 0;
private boolean _last = false;
@Override
protected Action process() throws Exception
{
if (_last)
return Action.SUCCEEDED;
long len = Math.min(MAX_MAPPED_BUFFER_SIZE, fileLength - _pos);
_last = (_pos + len == fileLength);
ByteBuffer buffer = BufferUtil.toMappedBuffer(_filePath, _pos, len);
getNextInterceptor().write(buffer, _last, this);
_pos += len;
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess()
{
dispose();
callback.succeeded();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
dispose();
callback.failed(cause);
}
};
icb.iterate();
}
}
}

View File

@ -0,0 +1,609 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.FileBufferedResponseHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.toolchain.test.FS;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FileBufferedResponseHandlerTest
{
private static final Logger LOG = LoggerFactory.getLogger(FileBufferedResponseHandlerTest.class);
private Server _server;
private LocalConnector _localConnector;
private ServerConnector _serverConnector;
private Path _testDir;
private FileBufferedResponseHandler _bufferedHandler;
@BeforeEach
public void before() throws Exception
{
_testDir = MavenTestingUtils.getTargetTestingPath(FileBufferedResponseHandlerTest.class.getName());
FS.ensureDirExists(_testDir);
_server = new Server();
HttpConfiguration config = new HttpConfiguration();
config.setOutputBufferSize(1024);
config.setOutputAggregationSize(256);
_localConnector = new LocalConnector(_server, new HttpConnectionFactory(config));
_localConnector.setIdleTimeout(Duration.ofMinutes(1).toMillis());
_server.addConnector(_localConnector);
_serverConnector = new ServerConnector(_server, new HttpConnectionFactory(config));
_server.addConnector(_serverConnector);
_bufferedHandler = new FileBufferedResponseHandler();
_bufferedHandler.setTempDir(_testDir);
_bufferedHandler.getPathIncludeExclude().include("/include/*");
_bufferedHandler.getPathIncludeExclude().exclude("*.exclude");
_bufferedHandler.getMimeIncludeExclude().exclude("text/excluded");
_server.setHandler(_bufferedHandler);
FS.ensureEmpty(_testDir);
}
@AfterEach
public void after() throws Exception
{
_server.stop();
}
@Test
public void testPathNotIncluded() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was committed after the first write and we never created a file to buffer the response into.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("Committed: true"));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testIncludedByPath() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was not committed after the first write and a file was created to buffer the response.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("Committed: false"));
assertThat(responseContent, containsString("NumFiles: 1"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testExcludedByPath() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path.exclude HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was committed after the first write and we never created a file to buffer the response into.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("Committed: true"));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testExcludedByMime() throws Exception
{
String excludedMimeType = "text/excluded";
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setContentType(excludedMimeType);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was committed after the first write and we never created a file to buffer the response into.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("Committed: true"));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testFlushed() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(1024);
PrintWriter writer = response.getWriter();
writer.println("a string smaller than the buffer size");
writer.println("NumFilesBeforeFlush: " + getNumFiles());
writer.flush();
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The response was not committed after the buffer was flushed and a file was created to buffer the response.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("NumFilesBeforeFlush: 0"));
assertThat(responseContent, containsString("Committed: false"));
assertThat(responseContent, containsString("NumFiles: 1"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testClosed() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(10);
PrintWriter writer = response.getWriter();
writer.println("a string larger than the buffer size");
writer.println("NumFiles: " + getNumFiles());
writer.close();
writer.println("writtenAfterClose");
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The content written after close was not sent.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, not(containsString("writtenAfterClose")));
assertThat(responseContent, containsString("NumFiles: 1"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testBufferSizeBig() throws Exception
{
int bufferSize = 4096;
String largeContent = generateContent(bufferSize - 64);
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(bufferSize);
PrintWriter writer = response.getWriter();
writer.println(largeContent);
writer.println("Committed: " + response.isCommitted());
writer.println("NumFiles: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The content written was not buffered as a file as it was less than the buffer size.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, not(containsString("writtenAfterClose")));
assertThat(responseContent, containsString("Committed: false"));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testFlushEmpty() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(1024);
PrintWriter writer = response.getWriter();
writer.flush();
int numFiles = getNumFiles();
writer.println("NumFiles: " + numFiles);
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// The flush should not create the file unless there is content to write.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, containsString("NumFiles: 0"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testReset() throws Exception
{
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
response.setBufferSize(8);
PrintWriter writer = response.getWriter();
writer.println("THIS WILL BE RESET");
writer.flush();
writer.println("THIS WILL BE RESET");
int numFilesBeforeReset = getNumFiles();
response.resetBuffer();
int numFilesAfterReset = getNumFiles();
writer.println("NumFilesBeforeReset: " + numFilesBeforeReset);
writer.println("NumFilesAfterReset: " + numFilesAfterReset);
writer.println("a string larger than the buffer size");
writer.println("NumFilesAfterWrite: " + getNumFiles());
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
String responseContent = response.getContent();
// Resetting the response buffer will delete the file.
assertThat(response.getStatus(), is(HttpStatus.OK_200));
assertThat(responseContent, not(containsString("THIS WILL BE RESET")));
assertThat(responseContent, containsString("NumFilesBeforeReset: 1"));
assertThat(responseContent, containsString("NumFilesAfterReset: 0"));
assertThat(responseContent, containsString("NumFilesAfterWrite: 1"));
assertThat(getNumFiles(), is(0));
}
@Test
public void testFileLargerThanMaxInteger() throws Exception
{
long fileSize = Integer.MAX_VALUE + 1234L;
byte[] bytes = randomBytes(1024 * 1024);
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
ServletOutputStream outputStream = response.getOutputStream();
long written = 0;
while (written < fileSize)
{
int length = Math.toIntExact(Math.min(bytes.length, fileSize - written));
outputStream.write(bytes, 0, length);
written += length;
}
outputStream.flush();
response.setHeader("NumFiles", Integer.toString(getNumFiles()));
response.setHeader("FileSize", Long.toString(getFileSize()));
}
});
_server.start();
AtomicLong received = new AtomicLong();
HttpTester.Response response = new HttpTester.Response()
{
@Override
public boolean content(ByteBuffer ref)
{
// Verify the content is what was sent.
while (ref.hasRemaining())
{
byte byteFromBuffer = ref.get();
long totalReceived = received.getAndIncrement();
int bytesIndex = (int)(totalReceived % bytes.length);
byte byteFromArray = bytes[bytesIndex];
if (byteFromBuffer != byteFromArray)
{
LOG.warn("Mismatch at index {} received bytes {}, {}!={}", bytesIndex, totalReceived, byteFromBuffer, byteFromArray, new IllegalStateException());
return true;
}
}
return false;
}
};
try (Socket socket = new Socket("localhost", _serverConnector.getLocalPort()))
{
OutputStream output = socket.getOutputStream();
String request = "GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
HttpTester.Input input = HttpTester.from(socket.getInputStream());
HttpTester.parseResponse(input, response);
}
assertTrue(response.isComplete());
assertThat(response.get("NumFiles"), is("1"));
assertThat(response.get("FileSize"), is(Long.toString(fileSize)));
assertThat(received.get(), is(fileSize));
assertThat(getNumFiles(), is(0));
}
@Test
public void testNextInterceptorFailed() throws Exception
{
AbstractHandler failingInterceptorHandler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
HttpOutput httpOutput = baseRequest.getResponse().getHttpOutput();
HttpOutput.Interceptor nextInterceptor = httpOutput.getInterceptor();
httpOutput.setInterceptor(new HttpOutput.Interceptor()
{
@Override
public void write(ByteBuffer content, boolean last, Callback callback)
{
callback.failed(new Throwable("intentionally throwing from interceptor"));
}
@Override
public HttpOutput.Interceptor getNextInterceptor()
{
return nextInterceptor;
}
});
}
};
_server.setHandler(new HandlerCollection(failingInterceptorHandler, _server.getHandler()));
CompletableFuture<Throwable> errorFuture = new CompletableFuture<>();
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
byte[] chunk1 = "this content will ".getBytes();
byte[] chunk2 = "be buffered in a file".getBytes();
response.setContentLength(chunk1.length + chunk2.length);
ServletOutputStream outputStream = response.getOutputStream();
// Write chunk1 and then flush so it is written to the file.
outputStream.write(chunk1);
outputStream.flush();
assertThat(getNumFiles(), is(1));
try
{
// ContentLength is set so it knows this is the last write.
// This will cause the file to be written to the next interceptor which will fail.
outputStream.write(chunk2);
}
catch (Throwable t)
{
errorFuture.complete(t);
throw t;
}
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
// Response was aborted.
assertThat(response.getStatus(), is(0));
// We failed because of the next interceptor.
Throwable error = errorFuture.get(5, TimeUnit.SECONDS);
assertThat(error.getMessage(), containsString("intentionally throwing from interceptor"));
// All files were deleted.
assertThat(getNumFiles(), is(0));
}
@Test
public void testFileWriteFailed() throws Exception
{
// Set the temp directory to an empty directory so that the file cannot be created.
File tempDir = MavenTestingUtils.getTargetTestingDir(getClass().getSimpleName());
FS.ensureDeleted(tempDir);
_bufferedHandler.setTempDir(tempDir.toPath());
CompletableFuture<Throwable> errorFuture = new CompletableFuture<>();
_bufferedHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
ServletOutputStream outputStream = response.getOutputStream();
byte[] content = "this content will be buffered in a file".getBytes();
try
{
// Write the content and flush it to the file.
// This should throw as it cannot create the file to aggregate into.
outputStream.write(content);
outputStream.flush();
}
catch (Throwable t)
{
errorFuture.complete(t);
throw t;
}
}
});
_server.start();
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
// Response was aborted.
assertThat(response.getStatus(), is(0));
// We failed because cannot create the file.
Throwable error = errorFuture.get(5, TimeUnit.SECONDS);
assertThat(error, instanceOf(NoSuchFileException.class));
// No files were created.
assertThat(getNumFiles(), is(0));
}
private int getNumFiles()
{
File[] files = _testDir.toFile().listFiles();
if (files == null)
return 0;
return files.length;
}
private long getFileSize()
{
File[] files = _testDir.toFile().listFiles();
assertNotNull(files);
assertThat(files.length, is(1));
return files[0].length();
}
private static String generateContent(int size)
{
Random random = new Random();
StringBuilder stringBuilder = new StringBuilder(size);
for (int i = 0; i < size; i++)
{
stringBuilder.append((char)Math.abs(random.nextInt(0x7F)));
}
return stringBuilder.toString();
}
@SuppressWarnings("SameParameterValue")
private byte[] randomBytes(int size)
{
byte[] data = new byte[size];
new Random().nextBytes(data);
return data;
}
}

View File

@ -25,6 +25,7 @@ import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
@ -1032,9 +1033,14 @@ public class BufferUtil
public static ByteBuffer toMappedBuffer(File file) throws IOException
{
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ))
return toMappedBuffer(file.toPath(), 0, file.length());
}
public static ByteBuffer toMappedBuffer(Path filePath, long pos, long len) throws IOException
{
return channel.map(MapMode.READ_ONLY, 0, file.length());
try (FileChannel channel = FileChannel.open(filePath, StandardOpenOption.READ))
{
return channel.map(MapMode.READ_ONLY, pos, len);
}
}