Create FileBufferedResponseHandler to buffer responses into a file. (#6010)
FileBufferedResponseHandler adds an HttpOutput.Interceptor to buffer all responses into a file until the output is closed. This allows the commit to be delayed until the response is complete and thus headers and response status can be changed while writing the body. Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
39fe2ecc4d
commit
eca8edcea5
|
@ -325,8 +325,9 @@ public class HttpStatus
|
|||
switch (status)
|
||||
{
|
||||
case NO_CONTENT_204:
|
||||
case NOT_MODIFIED_304:
|
||||
case RESET_CONTENT_205:
|
||||
case PARTIAL_CONTENT_206:
|
||||
case NOT_MODIFIED_304:
|
||||
return true;
|
||||
|
||||
default:
|
||||
|
|
|
@ -47,6 +47,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
|
|||
static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class);
|
||||
static final InetAddress NOIP;
|
||||
static final InetSocketAddress NOIPPORT;
|
||||
private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 1024;
|
||||
|
||||
static
|
||||
{
|
||||
|
@ -80,6 +81,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
|
|||
private final Locker _locker = new Locker();
|
||||
private final Condition _hasOutput = _locker.newCondition();
|
||||
private final Queue<ByteBuffer> _inQ = new ArrayDeque<>();
|
||||
private final int _outputSize;
|
||||
private ByteBuffer _out;
|
||||
private boolean _growOutput;
|
||||
|
||||
|
@ -129,7 +131,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
|
|||
super(timer);
|
||||
if (BufferUtil.hasContent(input))
|
||||
addInput(input);
|
||||
_out = output == null ? BufferUtil.allocate(1024) : output;
|
||||
_outputSize = (output == null) ? 1024 : output.capacity();
|
||||
_out = output == null ? BufferUtil.allocate(_outputSize) : output;
|
||||
setIdleTimeout(idleTimeoutMs);
|
||||
onOpen();
|
||||
}
|
||||
|
@ -296,7 +299,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
|
|||
try (Locker.Lock lock = _locker.lock())
|
||||
{
|
||||
b = _out;
|
||||
_out = BufferUtil.allocate(b.capacity());
|
||||
_out = BufferUtil.allocate(_outputSize);
|
||||
}
|
||||
getWriteFlusher().completeWrite();
|
||||
return b;
|
||||
|
@ -322,7 +325,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
|
|||
return null;
|
||||
}
|
||||
b = _out;
|
||||
_out = BufferUtil.allocate(b.capacity());
|
||||
_out = BufferUtil.allocate(_outputSize);
|
||||
}
|
||||
getWriteFlusher().completeWrite();
|
||||
return b;
|
||||
|
@ -436,9 +439,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint
|
|||
BufferUtil.compact(_out);
|
||||
if (b.remaining() > BufferUtil.space(_out))
|
||||
{
|
||||
ByteBuffer n = BufferUtil.allocate(_out.capacity() + b.remaining() * 2);
|
||||
BufferUtil.append(n, _out);
|
||||
_out = n;
|
||||
// Don't grow larger than MAX_BUFFER_SIZE to avoid memory issues.
|
||||
if (_out.capacity() < MAX_BUFFER_SIZE)
|
||||
{
|
||||
long newBufferCapacity = Math.min((long)(_out.capacity() + b.remaining() * 1.5), MAX_BUFFER_SIZE);
|
||||
ByteBuffer n = BufferUtil.allocate(Math.toIntExact(newBufferCapacity));
|
||||
BufferUtil.append(n, _out);
|
||||
_out = n;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -642,6 +642,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
catch (Throwable t)
|
||||
{
|
||||
onWriteComplete(true, t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,14 +20,15 @@ package org.eclipse.jetty.server.handler;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.MimeTypes;
|
||||
import org.eclipse.jetty.http.pathmap.PathSpecSet;
|
||||
import org.eclipse.jetty.server.HttpChannel;
|
||||
|
@ -45,16 +46,17 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* Buffered Response Handler
|
||||
* <p>
|
||||
* A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor}
|
||||
* mechanism to buffer the entire response content until the output is closed.
|
||||
* This allows the commit to be delayed until the response is complete and thus
|
||||
* headers and response status can be changed while writing the body.
|
||||
* </p>
|
||||
* <p>
|
||||
* Note that the decision to buffer is influenced by the headers and status at the
|
||||
* first write, and thus subsequent changes to those headers will not influence the
|
||||
* decision to buffer or not.
|
||||
* </p>
|
||||
* <p>
|
||||
* Note also that there are no memory limits to the size of the buffer, thus
|
||||
* this handler can represent an unbounded memory commitment if the content
|
||||
|
@ -63,7 +65,7 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
*/
|
||||
public class BufferedResponseHandler extends HandlerWrapper
|
||||
{
|
||||
static final Logger LOG = Log.getLogger(BufferedResponseHandler.class);
|
||||
private static final Logger LOG = Log.getLogger(BufferedResponseHandler.class);
|
||||
|
||||
private final IncludeExclude<String> _methods = new IncludeExclude<>();
|
||||
private final IncludeExclude<String> _paths = new IncludeExclude<>(PathSpecSet.class);
|
||||
|
@ -71,10 +73,7 @@ public class BufferedResponseHandler extends HandlerWrapper
|
|||
|
||||
public BufferedResponseHandler()
|
||||
{
|
||||
// include only GET requests
|
||||
|
||||
_methods.include(HttpMethod.GET.asString());
|
||||
// Exclude images, aduio and video from buffering
|
||||
for (String type : MimeTypes.getKnownMimeTypes())
|
||||
{
|
||||
if (type.startsWith("image/") ||
|
||||
|
@ -82,7 +81,9 @@ public class BufferedResponseHandler extends HandlerWrapper
|
|||
type.startsWith("video/"))
|
||||
_mimeTypes.exclude(type);
|
||||
}
|
||||
LOG.debug("{} mime types {}", this, _mimeTypes);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} mime types {}", this, _mimeTypes);
|
||||
}
|
||||
|
||||
public IncludeExclude<String> getMethodIncludeExclude()
|
||||
|
@ -100,69 +101,6 @@ public class BufferedResponseHandler extends HandlerWrapper
|
|||
return _mimeTypes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.handler.HandlerWrapper#handle(java.lang.String, org.eclipse.jetty.server.Request, javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
|
||||
*/
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
ServletContext context = baseRequest.getServletContext();
|
||||
String path = context == null ? baseRequest.getRequestURI() : URIUtil.addPaths(baseRequest.getServletPath(), baseRequest.getPathInfo());
|
||||
LOG.debug("{} handle {} in {}", this, baseRequest, context);
|
||||
|
||||
HttpOutput out = baseRequest.getResponse().getHttpOutput();
|
||||
|
||||
// Are we already being gzipped?
|
||||
HttpOutput.Interceptor interceptor = out.getInterceptor();
|
||||
while (interceptor != null)
|
||||
{
|
||||
if (interceptor instanceof BufferedInterceptor)
|
||||
{
|
||||
LOG.debug("{} already intercepting {}", this, request);
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
return;
|
||||
}
|
||||
interceptor = interceptor.getNextInterceptor();
|
||||
}
|
||||
|
||||
// If not a supported method - no Vary because no matter what client, this URI is always excluded
|
||||
if (!_methods.test(baseRequest.getMethod()))
|
||||
{
|
||||
LOG.debug("{} excluded by method {}", this, request);
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
return;
|
||||
}
|
||||
|
||||
// If not a supported URI- no Vary because no matter what client, this URI is always excluded
|
||||
// Use pathInfo because this is be
|
||||
if (!isPathBufferable(path))
|
||||
{
|
||||
LOG.debug("{} excluded by path {}", this, request);
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
return;
|
||||
}
|
||||
|
||||
// If the mime type is known from the path, then apply mime type filtering
|
||||
String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path);
|
||||
if (mimeType != null)
|
||||
{
|
||||
mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType);
|
||||
if (!isMimeTypeBufferable(mimeType))
|
||||
{
|
||||
LOG.debug("{} excluded by path suffix mime type {}", this, request);
|
||||
// handle normally without setting vary header
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// install interceptor and handle
|
||||
out.setInterceptor(new BufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor()));
|
||||
|
||||
if (_handler != null)
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
}
|
||||
|
||||
protected boolean isMimeTypeBufferable(String mimetype)
|
||||
{
|
||||
return _mimeTypes.test(mimetype);
|
||||
|
@ -176,93 +114,115 @@ public class BufferedResponseHandler extends HandlerWrapper
|
|||
return _paths.test(requestURI);
|
||||
}
|
||||
|
||||
private class BufferedInterceptor implements HttpOutput.Interceptor
|
||||
protected boolean shouldBuffer(HttpChannel channel, boolean last)
|
||||
{
|
||||
final Interceptor _next;
|
||||
final HttpChannel _channel;
|
||||
final Queue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
|
||||
Boolean _aggregating;
|
||||
ByteBuffer _aggregate;
|
||||
if (last)
|
||||
return false;
|
||||
|
||||
public BufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
|
||||
Response response = channel.getResponse();
|
||||
int status = response.getStatus();
|
||||
if (HttpStatus.hasNoBody(status) || HttpStatus.isRedirection(status))
|
||||
return false;
|
||||
|
||||
String ct = response.getContentType();
|
||||
if (ct == null)
|
||||
return true;
|
||||
|
||||
ct = MimeTypes.getContentTypeWithoutCharset(ct);
|
||||
return isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
ServletContext context = baseRequest.getServletContext();
|
||||
String path = context == null ? baseRequest.getRequestURI() : URIUtil.addPaths(baseRequest.getServletPath(), baseRequest.getPathInfo());
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} handle {} in {}", this, baseRequest, context);
|
||||
|
||||
// Are we already buffering?
|
||||
HttpOutput out = baseRequest.getResponse().getHttpOutput();
|
||||
HttpOutput.Interceptor interceptor = out.getInterceptor();
|
||||
while (interceptor != null)
|
||||
{
|
||||
if (interceptor instanceof BufferedInterceptor)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} already intercepting {}", this, request);
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
return;
|
||||
}
|
||||
interceptor = interceptor.getNextInterceptor();
|
||||
}
|
||||
|
||||
// If not a supported method this URI is always excluded.
|
||||
if (!_methods.test(baseRequest.getMethod()))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} excluded by method {}", this, request);
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
return;
|
||||
}
|
||||
|
||||
// If not a supported path this URI is always excluded.
|
||||
if (!isPathBufferable(path))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} excluded by path {}", this, request);
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
return;
|
||||
}
|
||||
|
||||
// If the mime type is known from the path then apply mime type filtering.
|
||||
String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path);
|
||||
if (mimeType != null)
|
||||
{
|
||||
mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType);
|
||||
if (!isMimeTypeBufferable(mimeType))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} excluded by path suffix mime type {}", this, request);
|
||||
|
||||
// handle normally without setting vary header
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Install buffered interceptor and handle.
|
||||
out.setInterceptor(newBufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor()));
|
||||
if (_handler != null)
|
||||
_handler.handle(target, baseRequest, request, response);
|
||||
}
|
||||
|
||||
protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
|
||||
{
|
||||
return new ArrayBufferedInterceptor(httpChannel, interceptor);
|
||||
}
|
||||
|
||||
/**
|
||||
* An {@link HttpOutput.Interceptor} which is created by {@link #newBufferedInterceptor(HttpChannel, Interceptor)}
|
||||
* and is used by the implementation to buffer outgoing content.
|
||||
*/
|
||||
protected interface BufferedInterceptor extends HttpOutput.Interceptor
|
||||
{
|
||||
}
|
||||
|
||||
private class ArrayBufferedInterceptor implements BufferedInterceptor
|
||||
{
|
||||
private final Interceptor _next;
|
||||
private final HttpChannel _channel;
|
||||
private final Queue<ByteBuffer> _buffers = new ArrayDeque<>();
|
||||
private Boolean _aggregating;
|
||||
private ByteBuffer _aggregate;
|
||||
|
||||
public ArrayBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
|
||||
{
|
||||
_next = interceptor;
|
||||
_channel = httpChannel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetBuffer()
|
||||
{
|
||||
_buffers.clear();
|
||||
_aggregating = null;
|
||||
_aggregate = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer content, boolean last, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
|
||||
// if we are not committed, have to decide if we should aggregate or not
|
||||
if (_aggregating == null)
|
||||
{
|
||||
Response response = _channel.getResponse();
|
||||
int sc = response.getStatus();
|
||||
if (sc > 0 && (sc < 200 || sc == 204 || sc == 205 || sc >= 300))
|
||||
_aggregating = Boolean.FALSE; // No body
|
||||
else
|
||||
{
|
||||
String ct = response.getContentType();
|
||||
if (ct == null)
|
||||
_aggregating = Boolean.TRUE;
|
||||
else
|
||||
{
|
||||
ct = MimeTypes.getContentTypeWithoutCharset(ct);
|
||||
_aggregating = isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we are not aggregating, then handle normally
|
||||
if (!_aggregating.booleanValue())
|
||||
{
|
||||
getNextInterceptor().write(content, last, callback);
|
||||
return;
|
||||
}
|
||||
|
||||
// If last
|
||||
if (last)
|
||||
{
|
||||
// Add the current content to the buffer list without a copy
|
||||
if (BufferUtil.length(content) > 0)
|
||||
_buffers.add(content);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} committing {}", this, _buffers.size());
|
||||
commit(_buffers, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} aggregating", this);
|
||||
|
||||
// Aggregate the content into buffer chain
|
||||
while (BufferUtil.hasContent(content))
|
||||
{
|
||||
// Do we need a new aggregate buffer
|
||||
if (BufferUtil.space(_aggregate) == 0)
|
||||
{
|
||||
int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content));
|
||||
_aggregate = BufferUtil.allocate(size); // TODO use a buffer pool
|
||||
_buffers.add(_aggregate);
|
||||
}
|
||||
|
||||
BufferUtil.append(_aggregate, content);
|
||||
}
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interceptor getNextInterceptor()
|
||||
{
|
||||
|
@ -275,21 +235,82 @@ public class BufferedResponseHandler extends HandlerWrapper
|
|||
return false;
|
||||
}
|
||||
|
||||
protected void commit(Queue<ByteBuffer> buffers, Callback callback)
|
||||
@Override
|
||||
public void resetBuffer()
|
||||
{
|
||||
// If only 1 buffer
|
||||
if (_buffers.size() == 0)
|
||||
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
|
||||
else if (_buffers.size() == 1)
|
||||
// just flush it with the last callback
|
||||
getNextInterceptor().write(_buffers.remove(), true, callback);
|
||||
_buffers.clear();
|
||||
_aggregating = null;
|
||||
_aggregate = null;
|
||||
BufferedInterceptor.super.resetBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer content, boolean last, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
|
||||
|
||||
// If we are not committed, have to decide if we should aggregate or not.
|
||||
if (_aggregating == null)
|
||||
_aggregating = shouldBuffer(_channel, last);
|
||||
|
||||
// If we are not aggregating, then handle normally.
|
||||
if (!_aggregating)
|
||||
{
|
||||
getNextInterceptor().write(content, last, callback);
|
||||
return;
|
||||
}
|
||||
|
||||
if (last)
|
||||
{
|
||||
// Add the current content to the buffer list without a copy.
|
||||
if (BufferUtil.length(content) > 0)
|
||||
_buffers.offer(content);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} committing {}", this, _buffers.size());
|
||||
commit(callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Create an iterating callback to do the writing
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} aggregating", this);
|
||||
|
||||
// Aggregate the content into buffer chain.
|
||||
while (BufferUtil.hasContent(content))
|
||||
{
|
||||
// Do we need a new aggregate buffer.
|
||||
if (BufferUtil.space(_aggregate) == 0)
|
||||
{
|
||||
// TODO: use a buffer pool always allocating with outputBufferSize to avoid polluting the ByteBufferPool.
|
||||
int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content));
|
||||
_aggregate = BufferUtil.allocate(size);
|
||||
_buffers.offer(_aggregate);
|
||||
}
|
||||
|
||||
BufferUtil.append(_aggregate, content);
|
||||
}
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
|
||||
private void commit(Callback callback)
|
||||
{
|
||||
if (_buffers.size() == 0)
|
||||
{
|
||||
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
|
||||
}
|
||||
else if (_buffers.size() == 1)
|
||||
{
|
||||
getNextInterceptor().write(_buffers.poll(), true, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Create an iterating callback to do the writing.
|
||||
IteratingCallback icb = new IteratingCallback()
|
||||
{
|
||||
@Override
|
||||
protected Action process() throws Exception
|
||||
protected Action process()
|
||||
{
|
||||
ByteBuffer buffer = _buffers.poll();
|
||||
if (buffer == null)
|
||||
|
@ -302,14 +323,14 @@ public class BufferedResponseHandler extends HandlerWrapper
|
|||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
// Signal last callback
|
||||
// Signal last callback.
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
// Signal last callback
|
||||
// Signal last callback.
|
||||
callback.failed(cause);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -0,0 +1,243 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.server.handler;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.server.HttpChannel;
|
||||
import org.eclipse.jetty.server.HttpOutput.Interceptor;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor}
|
||||
* mechanism to buffer the entire response content until the output is closed.
|
||||
* This allows the commit to be delayed until the response is complete and thus
|
||||
* headers and response status can be changed while writing the body.
|
||||
* </p>
|
||||
* <p>
|
||||
* Note that the decision to buffer is influenced by the headers and status at the
|
||||
* first write, and thus subsequent changes to those headers will not influence the
|
||||
* decision to buffer or not.
|
||||
* </p>
|
||||
* <p>
|
||||
* Note also that there are no memory limits to the size of the buffer, thus
|
||||
* this handler can represent an unbounded memory commitment if the content
|
||||
* generated can also be unbounded.
|
||||
* </p>
|
||||
*/
|
||||
public class FileBufferedResponseHandler extends BufferedResponseHandler
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(FileBufferedResponseHandler.class);
|
||||
|
||||
private Path _tempDir = new File(System.getProperty("java.io.tmpdir")).toPath();
|
||||
|
||||
public Path getTempDir()
|
||||
{
|
||||
return _tempDir;
|
||||
}
|
||||
|
||||
public void setTempDir(Path tempDir)
|
||||
{
|
||||
_tempDir = Objects.requireNonNull(tempDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
|
||||
{
|
||||
return new FileBufferedInterceptor(httpChannel, interceptor);
|
||||
}
|
||||
|
||||
private class FileBufferedInterceptor implements BufferedResponseHandler.BufferedInterceptor
|
||||
{
|
||||
private static final int MAX_MAPPED_BUFFER_SIZE = Integer.MAX_VALUE / 2;
|
||||
|
||||
private final Interceptor _next;
|
||||
private final HttpChannel _channel;
|
||||
private Boolean _aggregating;
|
||||
private Path _filePath;
|
||||
private OutputStream _fileOutputStream;
|
||||
|
||||
public FileBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor)
|
||||
{
|
||||
_next = interceptor;
|
||||
_channel = httpChannel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interceptor getNextInterceptor()
|
||||
{
|
||||
return _next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOptimizedForDirectBuffers()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetBuffer()
|
||||
{
|
||||
dispose();
|
||||
BufferedInterceptor.super.resetBuffer();
|
||||
}
|
||||
|
||||
private void dispose()
|
||||
{
|
||||
IO.close(_fileOutputStream);
|
||||
_fileOutputStream = null;
|
||||
_aggregating = null;
|
||||
|
||||
if (_filePath != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
Files.delete(_filePath);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.warn("Could not delete file {}", _filePath, t);
|
||||
}
|
||||
_filePath = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer content, boolean last, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content));
|
||||
|
||||
// If we are not committed, must decide if we should aggregate or not.
|
||||
if (_aggregating == null)
|
||||
_aggregating = shouldBuffer(_channel, last);
|
||||
|
||||
// If we are not aggregating, then handle normally.
|
||||
if (!_aggregating)
|
||||
{
|
||||
getNextInterceptor().write(content, last, callback);
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} aggregating", this);
|
||||
|
||||
try
|
||||
{
|
||||
if (BufferUtil.hasContent(content))
|
||||
aggregate(content);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
dispose();
|
||||
callback.failed(t);
|
||||
return;
|
||||
}
|
||||
|
||||
if (last)
|
||||
commit(callback);
|
||||
else
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
private void aggregate(ByteBuffer content) throws IOException
|
||||
{
|
||||
if (_fileOutputStream == null)
|
||||
{
|
||||
// Create a new OutputStream to a file.
|
||||
_filePath = Files.createTempFile(_tempDir, "BufferedResponse", "");
|
||||
_fileOutputStream = Files.newOutputStream(_filePath, StandardOpenOption.WRITE);
|
||||
}
|
||||
|
||||
BufferUtil.writeTo(content, _fileOutputStream);
|
||||
}
|
||||
|
||||
private void commit(Callback callback)
|
||||
{
|
||||
if (_fileOutputStream == null)
|
||||
{
|
||||
// We have no content to write, signal next interceptor that we are finished.
|
||||
getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
_fileOutputStream.close();
|
||||
_fileOutputStream = null;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
dispose();
|
||||
callback.failed(t);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create an iterating callback to do the writing
|
||||
IteratingCallback icb = new IteratingCallback()
|
||||
{
|
||||
private final long fileLength = _filePath.toFile().length();
|
||||
private long _pos = 0;
|
||||
private boolean _last = false;
|
||||
|
||||
@Override
|
||||
protected Action process() throws Exception
|
||||
{
|
||||
if (_last)
|
||||
return Action.SUCCEEDED;
|
||||
|
||||
long len = Math.min(MAX_MAPPED_BUFFER_SIZE, fileLength - _pos);
|
||||
_last = (_pos + len == fileLength);
|
||||
ByteBuffer buffer = BufferUtil.toMappedBuffer(_filePath, _pos, len);
|
||||
getNextInterceptor().write(buffer, _last, this);
|
||||
_pos += len;
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
dispose();
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
dispose();
|
||||
callback.failed(cause);
|
||||
}
|
||||
};
|
||||
icb.iterate();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,620 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpTester;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.server.handler.FileBufferedResponseHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerCollection;
|
||||
import org.eclipse.jetty.toolchain.test.FS;
|
||||
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class FileBufferedResponseHandlerTest
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(FileBufferedResponseHandlerTest.class);
|
||||
|
||||
private Server _server;
|
||||
private LocalConnector _localConnector;
|
||||
private ServerConnector _serverConnector;
|
||||
private Path _testDir;
|
||||
private FileBufferedResponseHandler _bufferedHandler;
|
||||
|
||||
@BeforeEach
|
||||
public void before() throws Exception
|
||||
{
|
||||
_testDir = MavenTestingUtils.getTargetTestingPath(FileBufferedResponseHandlerTest.class.getName());
|
||||
FS.ensureDirExists(_testDir);
|
||||
|
||||
_server = new Server();
|
||||
HttpConfiguration config = new HttpConfiguration();
|
||||
config.setOutputBufferSize(1024);
|
||||
config.setOutputAggregationSize(256);
|
||||
|
||||
_localConnector = new LocalConnector(_server, new HttpConnectionFactory(config));
|
||||
_localConnector.setIdleTimeout(Duration.ofMinutes(1).toMillis());
|
||||
_server.addConnector(_localConnector);
|
||||
_serverConnector = new ServerConnector(_server, new HttpConnectionFactory(config));
|
||||
_server.addConnector(_serverConnector);
|
||||
|
||||
_bufferedHandler = new FileBufferedResponseHandler();
|
||||
_bufferedHandler.setTempDir(_testDir);
|
||||
_bufferedHandler.getPathIncludeExclude().include("/include/*");
|
||||
_bufferedHandler.getPathIncludeExclude().exclude("*.exclude");
|
||||
_bufferedHandler.getMimeIncludeExclude().exclude("text/excluded");
|
||||
_server.setHandler(_bufferedHandler);
|
||||
|
||||
FS.ensureEmpty(_testDir);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void after() throws Exception
|
||||
{
|
||||
_server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPathNotIncluded() throws Exception
|
||||
{
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setBufferSize(10);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.println("a string larger than the buffer size");
|
||||
writer.println("Committed: " + response.isCommitted());
|
||||
writer.println("NumFiles: " + getNumFiles());
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// The response was committed after the first write and we never created a file to buffer the response into.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, containsString("Committed: true"));
|
||||
assertThat(responseContent, containsString("NumFiles: 0"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncludedByPath() throws Exception
|
||||
{
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setBufferSize(10);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.println("a string larger than the buffer size");
|
||||
writer.println("Committed: " + response.isCommitted());
|
||||
writer.println("NumFiles: " + getNumFiles());
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// The response was not committed after the first write and a file was created to buffer the response.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, containsString("Committed: false"));
|
||||
assertThat(responseContent, containsString("NumFiles: 1"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExcludedByPath() throws Exception
|
||||
{
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setBufferSize(10);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.println("a string larger than the buffer size");
|
||||
writer.println("Committed: " + response.isCommitted());
|
||||
writer.println("NumFiles: " + getNumFiles());
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path.exclude HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// The response was committed after the first write and we never created a file to buffer the response into.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, containsString("Committed: true"));
|
||||
assertThat(responseContent, containsString("NumFiles: 0"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExcludedByMime() throws Exception
|
||||
{
|
||||
String excludedMimeType = "text/excluded";
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setContentType(excludedMimeType);
|
||||
response.setBufferSize(10);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.println("a string larger than the buffer size");
|
||||
writer.println("Committed: " + response.isCommitted());
|
||||
writer.println("NumFiles: " + getNumFiles());
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// The response was committed after the first write and we never created a file to buffer the response into.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, containsString("Committed: true"));
|
||||
assertThat(responseContent, containsString("NumFiles: 0"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushed() throws Exception
|
||||
{
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setBufferSize(1024);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.println("a string smaller than the buffer size");
|
||||
writer.println("NumFilesBeforeFlush: " + getNumFiles());
|
||||
writer.flush();
|
||||
writer.println("Committed: " + response.isCommitted());
|
||||
writer.println("NumFiles: " + getNumFiles());
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// The response was not committed after the buffer was flushed and a file was created to buffer the response.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, containsString("NumFilesBeforeFlush: 0"));
|
||||
assertThat(responseContent, containsString("Committed: false"));
|
||||
assertThat(responseContent, containsString("NumFiles: 1"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosed() throws Exception
|
||||
{
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setBufferSize(10);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.println("a string larger than the buffer size");
|
||||
writer.println("NumFiles: " + getNumFiles());
|
||||
writer.close();
|
||||
writer.println("writtenAfterClose");
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// The content written after close was not sent.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, not(containsString("writtenAfterClose")));
|
||||
assertThat(responseContent, containsString("NumFiles: 1"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBufferSizeBig() throws Exception
|
||||
{
|
||||
int bufferSize = 4096;
|
||||
String largeContent = generateContent(bufferSize - 64);
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setBufferSize(bufferSize);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.println(largeContent);
|
||||
writer.println("Committed: " + response.isCommitted());
|
||||
writer.println("NumFiles: " + getNumFiles());
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// The content written was not buffered as a file as it was less than the buffer size.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, not(containsString("writtenAfterClose")));
|
||||
assertThat(responseContent, containsString("Committed: false"));
|
||||
assertThat(responseContent, containsString("NumFiles: 0"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushEmpty() throws Exception
|
||||
{
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setBufferSize(1024);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.flush();
|
||||
int numFiles = getNumFiles();
|
||||
writer.println("NumFiles: " + numFiles);
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// The flush should not create the file unless there is content to write.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, containsString("NumFiles: 0"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReset() throws Exception
|
||||
{
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setBufferSize(8);
|
||||
PrintWriter writer = response.getWriter();
|
||||
writer.println("THIS WILL BE RESET");
|
||||
writer.flush();
|
||||
writer.println("THIS WILL BE RESET");
|
||||
int numFilesBeforeReset = getNumFiles();
|
||||
response.resetBuffer();
|
||||
int numFilesAfterReset = getNumFiles();
|
||||
|
||||
writer.println("NumFilesBeforeReset: " + numFilesBeforeReset);
|
||||
writer.println("NumFilesAfterReset: " + numFilesAfterReset);
|
||||
writer.println("a string larger than the buffer size");
|
||||
writer.println("NumFilesAfterWrite: " + getNumFiles());
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
String responseContent = response.getContent();
|
||||
|
||||
// Resetting the response buffer will delete the file.
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
assertThat(responseContent, not(containsString("THIS WILL BE RESET")));
|
||||
assertThat(responseContent, containsString("NumFilesBeforeReset: 1"));
|
||||
assertThat(responseContent, containsString("NumFilesAfterReset: 0"));
|
||||
assertThat(responseContent, containsString("NumFilesAfterWrite: 1"));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileLargerThanMaxInteger() throws Exception
|
||||
{
|
||||
long fileSize = Integer.MAX_VALUE + 1234L;
|
||||
byte[] bytes = randomBytes(1024 * 1024);
|
||||
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
ServletOutputStream outputStream = response.getOutputStream();
|
||||
|
||||
long written = 0;
|
||||
while (written < fileSize)
|
||||
{
|
||||
int length = Math.toIntExact(Math.min(bytes.length, fileSize - written));
|
||||
outputStream.write(bytes, 0, length);
|
||||
written += length;
|
||||
}
|
||||
outputStream.flush();
|
||||
|
||||
response.setHeader("NumFiles", Integer.toString(getNumFiles()));
|
||||
response.setHeader("FileSize", Long.toString(getFileSize()));
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
|
||||
AtomicLong received = new AtomicLong();
|
||||
HttpTester.Response response = new HttpTester.Response()
|
||||
{
|
||||
@Override
|
||||
public boolean content(ByteBuffer ref)
|
||||
{
|
||||
// Verify the content is what was sent.
|
||||
while (ref.hasRemaining())
|
||||
{
|
||||
byte byteFromBuffer = ref.get();
|
||||
long totalReceived = received.getAndIncrement();
|
||||
int bytesIndex = (int)(totalReceived % bytes.length);
|
||||
byte byteFromArray = bytes[bytesIndex];
|
||||
|
||||
if (byteFromBuffer != byteFromArray)
|
||||
{
|
||||
LOG.warn("Mismatch at index {} received bytes {}, {}!={}", bytesIndex, totalReceived, byteFromBuffer, byteFromArray, new IllegalStateException());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
try (Socket socket = new Socket("localhost", _serverConnector.getLocalPort()))
|
||||
{
|
||||
OutputStream output = socket.getOutputStream();
|
||||
String request = "GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n";
|
||||
output.write(request.getBytes(StandardCharsets.UTF_8));
|
||||
output.flush();
|
||||
|
||||
HttpTester.Input input = HttpTester.from(socket.getInputStream());
|
||||
HttpTester.parseResponse(input, response);
|
||||
}
|
||||
|
||||
assertTrue(response.isComplete());
|
||||
assertThat(response.get("NumFiles"), is("1"));
|
||||
assertThat(response.get("FileSize"), is(Long.toString(fileSize)));
|
||||
assertThat(received.get(), is(fileSize));
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNextInterceptorFailed() throws Exception
|
||||
{
|
||||
AbstractHandler failingInterceptorHandler = new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
HttpOutput httpOutput = baseRequest.getResponse().getHttpOutput();
|
||||
HttpOutput.Interceptor nextInterceptor = httpOutput.getInterceptor();
|
||||
httpOutput.setInterceptor(new HttpOutput.Interceptor()
|
||||
{
|
||||
@Override
|
||||
public void write(ByteBuffer content, boolean last, Callback callback)
|
||||
{
|
||||
callback.failed(new Throwable("intentionally throwing from interceptor"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpOutput.Interceptor getNextInterceptor()
|
||||
{
|
||||
return nextInterceptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOptimizedForDirectBuffers()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
_server.setHandler(new HandlerCollection(failingInterceptorHandler, _server.getHandler()));
|
||||
CompletableFuture<Throwable> errorFuture = new CompletableFuture<>();
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
byte[] chunk1 = "this content will ".getBytes();
|
||||
byte[] chunk2 = "be buffered in a file".getBytes();
|
||||
response.setContentLength(chunk1.length + chunk2.length);
|
||||
ServletOutputStream outputStream = response.getOutputStream();
|
||||
|
||||
// Write chunk1 and then flush so it is written to the file.
|
||||
outputStream.write(chunk1);
|
||||
outputStream.flush();
|
||||
assertThat(getNumFiles(), is(1));
|
||||
|
||||
try
|
||||
{
|
||||
// ContentLength is set so it knows this is the last write.
|
||||
// This will cause the file to be written to the next interceptor which will fail.
|
||||
outputStream.write(chunk2);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
errorFuture.complete(t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
|
||||
// Response was aborted.
|
||||
assertThat(response.getStatus(), is(0));
|
||||
|
||||
// We failed because of the next interceptor.
|
||||
Throwable error = errorFuture.get(5, TimeUnit.SECONDS);
|
||||
assertThat(error.getMessage(), containsString("intentionally throwing from interceptor"));
|
||||
|
||||
// All files were deleted.
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileWriteFailed() throws Exception
|
||||
{
|
||||
// Set the temp directory to an empty directory so that the file cannot be created.
|
||||
File tempDir = MavenTestingUtils.getTargetTestingDir(getClass().getSimpleName());
|
||||
FS.ensureDeleted(tempDir);
|
||||
_bufferedHandler.setTempDir(tempDir.toPath());
|
||||
|
||||
CompletableFuture<Throwable> errorFuture = new CompletableFuture<>();
|
||||
_bufferedHandler.setHandler(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
ServletOutputStream outputStream = response.getOutputStream();
|
||||
byte[] content = "this content will be buffered in a file".getBytes();
|
||||
|
||||
try
|
||||
{
|
||||
// Write the content and flush it to the file.
|
||||
// This should throw as it cannot create the file to aggregate into.
|
||||
outputStream.write(content);
|
||||
outputStream.flush();
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
errorFuture.complete(t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
_server.start();
|
||||
String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
HttpTester.Response response = HttpTester.parseResponse(rawResponse);
|
||||
|
||||
// Response was aborted.
|
||||
assertThat(response.getStatus(), is(0));
|
||||
|
||||
// We failed because cannot create the file.
|
||||
Throwable error = errorFuture.get(5, TimeUnit.SECONDS);
|
||||
assertThat(error, instanceOf(NoSuchFileException.class));
|
||||
|
||||
// No files were created.
|
||||
assertThat(getNumFiles(), is(0));
|
||||
}
|
||||
|
||||
private int getNumFiles()
|
||||
{
|
||||
File[] files = _testDir.toFile().listFiles();
|
||||
if (files == null)
|
||||
return 0;
|
||||
|
||||
return files.length;
|
||||
}
|
||||
|
||||
private long getFileSize()
|
||||
{
|
||||
File[] files = _testDir.toFile().listFiles();
|
||||
assertNotNull(files);
|
||||
assertThat(files.length, is(1));
|
||||
return files[0].length();
|
||||
}
|
||||
|
||||
private static String generateContent(int size)
|
||||
{
|
||||
Random random = new Random();
|
||||
StringBuilder stringBuilder = new StringBuilder(size);
|
||||
for (int i = 0; i < size; i++)
|
||||
{
|
||||
stringBuilder.append((char)Math.abs(random.nextInt(0x7F)));
|
||||
}
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
@SuppressWarnings("SameParameterValue")
|
||||
private byte[] randomBytes(int size)
|
||||
{
|
||||
byte[] data = new byte[size];
|
||||
new Random().nextBytes(data);
|
||||
return data;
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ import java.nio.channels.FileChannel;
|
|||
import java.nio.channels.FileChannel.MapMode;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Arrays;
|
||||
|
||||
|
@ -985,9 +986,14 @@ public class BufferUtil
|
|||
|
||||
public static ByteBuffer toMappedBuffer(File file) throws IOException
|
||||
{
|
||||
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ))
|
||||
return toMappedBuffer(file.toPath(), 0, file.length());
|
||||
}
|
||||
|
||||
public static ByteBuffer toMappedBuffer(Path filePath, long pos, long len) throws IOException
|
||||
{
|
||||
try (FileChannel channel = FileChannel.open(filePath, StandardOpenOption.READ))
|
||||
{
|
||||
return channel.map(MapMode.READ_ONLY, 0, file.length());
|
||||
return channel.map(MapMode.READ_ONLY, pos, len);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue