From 4c98990cd93e2f36032121d2ce76598b1c85ad1d Mon Sep 17 00:00:00 2001 From: Lachlan Date: Mon, 19 Apr 2021 11:02:44 +1000 Subject: [PATCH] Create FileBufferedResponseHandler to buffer responses into a file. (#6010) FileBufferedResponseHandler adds an HttpOutput.Interceptor to buffer all responses into a file until the output is closed. This allows the commit to be delayed until the response is complete and thus headers and response status can be changed while writing the body. Signed-off-by: Lachlan Roberts --- .../org/eclipse/jetty/http/HttpStatus.java | 3 +- .../eclipse/jetty/io/ByteArrayEndPoint.java | 20 +- .../org/eclipse/jetty/server/HttpOutput.java | 2 +- .../handler/BufferedResponseHandler.java | 342 +++++----- .../handler/FileBufferedResponseHandler.java | 232 +++++++ .../FileBufferedResponseHandlerTest.java | 609 ++++++++++++++++++ .../org/eclipse/jetty/util/BufferUtil.java | 10 +- 7 files changed, 1048 insertions(+), 170 deletions(-) create mode 100644 jetty-server/src/main/java/org/eclipse/jetty/server/handler/FileBufferedResponseHandler.java create mode 100644 jetty-server/src/test/java/org/eclipse/jetty/server/FileBufferedResponseHandlerTest.java diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpStatus.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpStatus.java index 9c1f8ae80c2..cfe8eb37d40 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpStatus.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpStatus.java @@ -314,8 +314,9 @@ public class HttpStatus switch (status) { case NO_CONTENT_204: - case NOT_MODIFIED_304: + case RESET_CONTENT_205: case PARTIAL_CONTENT_206: + case NOT_MODIFIED_304: return true; default: diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index fd672e39912..9bd94727f72 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -42,6 +42,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint static final Logger LOG = LoggerFactory.getLogger(ByteArrayEndPoint.class); static final InetAddress NOIP; static final InetSocketAddress NOIPPORT; + private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 1024; static { @@ -67,6 +68,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint private final AutoLock _lock = new AutoLock(); private final Condition _hasOutput = _lock.newCondition(); private final Queue _inQ = new ArrayDeque<>(); + private final int _outputSize; private ByteBuffer _out; private boolean _growOutput; @@ -113,7 +115,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint super(timer); if (BufferUtil.hasContent(input)) addInput(input); - _out = output == null ? BufferUtil.allocate(1024) : output; + _outputSize = (output == null) ? 1024 : output.capacity(); + _out = output == null ? BufferUtil.allocate(_outputSize) : output; setIdleTimeout(idleTimeoutMs); onOpen(); } @@ -290,7 +293,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint try (AutoLock lock = _lock.lock()) { b = _out; - _out = BufferUtil.allocate(b.capacity()); + _out = BufferUtil.allocate(_outputSize); } getWriteFlusher().completeWrite(); return b; @@ -316,7 +319,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint return null; } b = _out; - _out = BufferUtil.allocate(b.capacity()); + _out = BufferUtil.allocate(_outputSize); } getWriteFlusher().completeWrite(); return b; @@ -424,9 +427,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint BufferUtil.compact(_out); if (b.remaining() > BufferUtil.space(_out)) { - ByteBuffer n = BufferUtil.allocate(_out.capacity() + b.remaining() * 2); - BufferUtil.append(n, _out); - _out = n; + // Don't grow larger than MAX_BUFFER_SIZE to avoid memory issues. + if (_out.capacity() < MAX_BUFFER_SIZE) + { + long newBufferCapacity = Math.min((long)(_out.capacity() + b.remaining() * 1.5), MAX_BUFFER_SIZE); + ByteBuffer n = BufferUtil.allocate(Math.toIntExact(newBufferCapacity)); + BufferUtil.append(n, _out); + _out = n; + } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 7baeffa8442..65a182b24fd 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -23,7 +23,6 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; -import java.util.ResourceBundle; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import javax.servlet.RequestDispatcher; @@ -627,6 +626,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable catch (Throwable t) { onWriteComplete(true, t); + throw t; } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java index 95fb38c85a3..78274594816 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java @@ -15,14 +15,15 @@ package org.eclipse.jetty.server.handler; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.http.pathmap.PathSpecSet; import org.eclipse.jetty.server.HttpChannel; @@ -39,16 +40,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Buffered Response Handler *

* A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor} * mechanism to buffer the entire response content until the output is closed. * This allows the commit to be delayed until the response is complete and thus * headers and response status can be changed while writing the body. + *

*

* Note that the decision to buffer is influenced by the headers and status at the * first write, and thus subsequent changes to those headers will not influence the * decision to buffer or not. + *

*

* Note also that there are no memory limits to the size of the buffer, thus * this handler can represent an unbounded memory commitment if the content @@ -57,7 +59,7 @@ import org.slf4j.LoggerFactory; */ public class BufferedResponseHandler extends HandlerWrapper { - static final Logger LOG = LoggerFactory.getLogger(BufferedResponseHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(BufferedResponseHandler.class); private final IncludeExclude _methods = new IncludeExclude<>(); private final IncludeExclude _paths = new IncludeExclude<>(PathSpecSet.class); @@ -65,10 +67,7 @@ public class BufferedResponseHandler extends HandlerWrapper public BufferedResponseHandler() { - // include only GET requests - _methods.include(HttpMethod.GET.asString()); - // Exclude images, aduio and video from buffering for (String type : MimeTypes.getKnownMimeTypes()) { if (type.startsWith("image/") || @@ -76,7 +75,9 @@ public class BufferedResponseHandler extends HandlerWrapper type.startsWith("video/")) _mimeTypes.exclude(type); } - LOG.debug("{} mime types {}", this, _mimeTypes); + + if (LOG.isDebugEnabled()) + LOG.debug("{} mime types {}", this, _mimeTypes); } public IncludeExclude getMethodIncludeExclude() @@ -94,66 +95,6 @@ public class BufferedResponseHandler extends HandlerWrapper return _mimeTypes; } - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException - { - final ServletContext context = baseRequest.getServletContext(); - final String path = baseRequest.getPathInContext(); - LOG.debug("{} handle {} in {}", this, baseRequest, context); - - HttpOutput out = baseRequest.getResponse().getHttpOutput(); - - // Are we already being gzipped? - HttpOutput.Interceptor interceptor = out.getInterceptor(); - while (interceptor != null) - { - if (interceptor instanceof BufferedInterceptor) - { - LOG.debug("{} already intercepting {}", this, request); - _handler.handle(target, baseRequest, request, response); - return; - } - interceptor = interceptor.getNextInterceptor(); - } - - // If not a supported method - no Vary because no matter what client, this URI is always excluded - if (!_methods.test(baseRequest.getMethod())) - { - LOG.debug("{} excluded by method {}", this, request); - _handler.handle(target, baseRequest, request, response); - return; - } - - // If not a supported URI- no Vary because no matter what client, this URI is always excluded - // Use pathInfo because this is be - if (!isPathBufferable(path)) - { - LOG.debug("{} excluded by path {}", this, request); - _handler.handle(target, baseRequest, request, response); - return; - } - - // If the mime type is known from the path, then apply mime type filtering - String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path); - if (mimeType != null) - { - mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType); - if (!isMimeTypeBufferable(mimeType)) - { - LOG.debug("{} excluded by path suffix mime type {}", this, request); - // handle normally without setting vary header - _handler.handle(target, baseRequest, request, response); - return; - } - } - - // install interceptor and handle - out.setInterceptor(new BufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor())); - - if (_handler != null) - _handler.handle(target, baseRequest, request, response); - } - protected boolean isMimeTypeBufferable(String mimetype) { return _mimeTypes.test(mimetype); @@ -167,116 +108,197 @@ public class BufferedResponseHandler extends HandlerWrapper return _paths.test(requestURI); } - private class BufferedInterceptor implements HttpOutput.Interceptor + protected boolean shouldBuffer(HttpChannel channel, boolean last) { - final Interceptor _next; - final HttpChannel _channel; - final Queue _buffers = new ConcurrentLinkedQueue<>(); - Boolean _aggregating; - ByteBuffer _aggregate; + if (last) + return false; - public BufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) + Response response = channel.getResponse(); + int status = response.getStatus(); + if (HttpStatus.hasNoBody(status) || HttpStatus.isRedirection(status)) + return false; + + String ct = response.getContentType(); + if (ct == null) + return true; + + ct = MimeTypes.getContentTypeWithoutCharset(ct); + return isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct)); + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + final ServletContext context = baseRequest.getServletContext(); + final String path = baseRequest.getPathInContext(); + + if (LOG.isDebugEnabled()) + LOG.debug("{} handle {} in {}", this, baseRequest, context); + + // Are we already buffering? + HttpOutput out = baseRequest.getResponse().getHttpOutput(); + HttpOutput.Interceptor interceptor = out.getInterceptor(); + while (interceptor != null) + { + if (interceptor instanceof BufferedInterceptor) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} already intercepting {}", this, request); + _handler.handle(target, baseRequest, request, response); + return; + } + interceptor = interceptor.getNextInterceptor(); + } + + // If not a supported method this URI is always excluded. + if (!_methods.test(baseRequest.getMethod())) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} excluded by method {}", this, request); + _handler.handle(target, baseRequest, request, response); + return; + } + + // If not a supported path this URI is always excluded. + if (!isPathBufferable(path)) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} excluded by path {}", this, request); + _handler.handle(target, baseRequest, request, response); + return; + } + + // If the mime type is known from the path then apply mime type filtering. + String mimeType = context == null ? MimeTypes.getDefaultMimeByExtension(path) : context.getMimeType(path); + if (mimeType != null) + { + mimeType = MimeTypes.getContentTypeWithoutCharset(mimeType); + if (!isMimeTypeBufferable(mimeType)) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} excluded by path suffix mime type {}", this, request); + + // handle normally without setting vary header + _handler.handle(target, baseRequest, request, response); + return; + } + } + + // Install buffered interceptor and handle. + out.setInterceptor(newBufferedInterceptor(baseRequest.getHttpChannel(), out.getInterceptor())); + if (_handler != null) + _handler.handle(target, baseRequest, request, response); + } + + protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) + { + return new ArrayBufferedInterceptor(httpChannel, interceptor); + } + + /** + * An {@link HttpOutput.Interceptor} which is created by {@link #newBufferedInterceptor(HttpChannel, Interceptor)} + * and is used by the implementation to buffer outgoing content. + */ + protected interface BufferedInterceptor extends HttpOutput.Interceptor + { + } + + private class ArrayBufferedInterceptor implements BufferedInterceptor + { + private final Interceptor _next; + private final HttpChannel _channel; + private final Queue _buffers = new ArrayDeque<>(); + private Boolean _aggregating; + private ByteBuffer _aggregate; + + public ArrayBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) { _next = interceptor; _channel = httpChannel; } - @Override - public void resetBuffer() - { - _buffers.clear(); - _aggregating = null; - _aggregate = null; - } - - ; - - @Override - public void write(ByteBuffer content, boolean last, Callback callback) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content)); - // if we are not committed, have to decide if we should aggregate or not - if (_aggregating == null) - { - Response response = _channel.getResponse(); - int sc = response.getStatus(); - if (sc > 0 && (sc < 200 || sc == 204 || sc == 205 || sc >= 300)) - _aggregating = Boolean.FALSE; // No body - else - { - String ct = response.getContentType(); - if (ct == null) - _aggregating = Boolean.TRUE; - else - { - ct = MimeTypes.getContentTypeWithoutCharset(ct); - _aggregating = isMimeTypeBufferable(StringUtil.asciiToLowerCase(ct)); - } - } - } - - // If we are not aggregating, then handle normally - if (!_aggregating.booleanValue()) - { - getNextInterceptor().write(content, last, callback); - return; - } - - // If last - if (last) - { - // Add the current content to the buffer list without a copy - if (BufferUtil.length(content) > 0) - _buffers.add(content); - - if (LOG.isDebugEnabled()) - LOG.debug("{} committing {}", this, _buffers.size()); - commit(_buffers, callback); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("{} aggregating", this); - - // Aggregate the content into buffer chain - while (BufferUtil.hasContent(content)) - { - // Do we need a new aggregate buffer - if (BufferUtil.space(_aggregate) == 0) - { - int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content)); - _aggregate = BufferUtil.allocate(size); // TODO use a buffer pool - _buffers.add(_aggregate); - } - - BufferUtil.append(_aggregate, content); - } - callback.succeeded(); - } - } - @Override public Interceptor getNextInterceptor() { return _next; } - protected void commit(Queue buffers, Callback callback) + @Override + public void resetBuffer() { - // If only 1 buffer - if (_buffers.size() == 0) - getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback); - else if (_buffers.size() == 1) - // just flush it with the last callback - getNextInterceptor().write(_buffers.remove(), true, callback); + _buffers.clear(); + _aggregating = null; + _aggregate = null; + BufferedInterceptor.super.resetBuffer(); + } + + @Override + public void write(ByteBuffer content, boolean last, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content)); + + // If we are not committed, have to decide if we should aggregate or not. + if (_aggregating == null) + _aggregating = shouldBuffer(_channel, last); + + // If we are not aggregating, then handle normally. + if (!_aggregating) + { + getNextInterceptor().write(content, last, callback); + return; + } + + if (last) + { + // Add the current content to the buffer list without a copy. + if (BufferUtil.length(content) > 0) + _buffers.offer(content); + + if (LOG.isDebugEnabled()) + LOG.debug("{} committing {}", this, _buffers.size()); + commit(callback); + } else { - // Create an iterating callback to do the writing + if (LOG.isDebugEnabled()) + LOG.debug("{} aggregating", this); + + // Aggregate the content into buffer chain. + while (BufferUtil.hasContent(content)) + { + // Do we need a new aggregate buffer. + if (BufferUtil.space(_aggregate) == 0) + { + // TODO: use a buffer pool always allocating with outputBufferSize to avoid polluting the ByteBufferPool. + int size = Math.max(_channel.getHttpConfiguration().getOutputBufferSize(), BufferUtil.length(content)); + _aggregate = BufferUtil.allocate(size); + _buffers.offer(_aggregate); + } + + BufferUtil.append(_aggregate, content); + } + callback.succeeded(); + } + } + + private void commit(Callback callback) + { + if (_buffers.size() == 0) + { + getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback); + } + else if (_buffers.size() == 1) + { + getNextInterceptor().write(_buffers.poll(), true, callback); + } + else + { + // Create an iterating callback to do the writing. IteratingCallback icb = new IteratingCallback() { @Override - protected Action process() throws Exception + protected Action process() { ByteBuffer buffer = _buffers.poll(); if (buffer == null) @@ -289,14 +311,14 @@ public class BufferedResponseHandler extends HandlerWrapper @Override protected void onCompleteSuccess() { - // Signal last callback + // Signal last callback. callback.succeeded(); } @Override protected void onCompleteFailure(Throwable cause) { - // Signal last callback + // Signal last callback. callback.failed(cause); } }; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/FileBufferedResponseHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/FileBufferedResponseHandler.java new file mode 100644 index 00000000000..e6b6e0b36ee --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/FileBufferedResponseHandler.java @@ -0,0 +1,232 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server.handler; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Objects; + +import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpOutput.Interceptor; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.IteratingCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

+ * A Handler that can apply a {@link org.eclipse.jetty.server.HttpOutput.Interceptor} + * mechanism to buffer the entire response content until the output is closed. + * This allows the commit to be delayed until the response is complete and thus + * headers and response status can be changed while writing the body. + *

+ *

+ * Note that the decision to buffer is influenced by the headers and status at the + * first write, and thus subsequent changes to those headers will not influence the + * decision to buffer or not. + *

+ *

+ * Note also that there are no memory limits to the size of the buffer, thus + * this handler can represent an unbounded memory commitment if the content + * generated can also be unbounded. + *

+ */ +public class FileBufferedResponseHandler extends BufferedResponseHandler +{ + private static final Logger LOG = LoggerFactory.getLogger(FileBufferedResponseHandler.class); + + private Path _tempDir = new File(System.getProperty("java.io.tmpdir")).toPath(); + + public Path getTempDir() + { + return _tempDir; + } + + public void setTempDir(Path tempDir) + { + _tempDir = Objects.requireNonNull(tempDir); + } + + @Override + protected BufferedInterceptor newBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) + { + return new FileBufferedInterceptor(httpChannel, interceptor); + } + + private class FileBufferedInterceptor implements BufferedResponseHandler.BufferedInterceptor + { + private static final int MAX_MAPPED_BUFFER_SIZE = Integer.MAX_VALUE / 2; + + private final Interceptor _next; + private final HttpChannel _channel; + private Boolean _aggregating; + private Path _filePath; + private OutputStream _fileOutputStream; + + public FileBufferedInterceptor(HttpChannel httpChannel, Interceptor interceptor) + { + _next = interceptor; + _channel = httpChannel; + } + + @Override + public Interceptor getNextInterceptor() + { + return _next; + } + + @Override + public void resetBuffer() + { + dispose(); + BufferedInterceptor.super.resetBuffer(); + } + + private void dispose() + { + IO.close(_fileOutputStream); + _fileOutputStream = null; + _aggregating = null; + + if (_filePath != null) + { + try + { + Files.delete(_filePath); + } + catch (Throwable t) + { + LOG.warn("Could not delete file {}", _filePath, t); + } + _filePath = null; + } + } + + @Override + public void write(ByteBuffer content, boolean last, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} write last={} {}", this, last, BufferUtil.toDetailString(content)); + + // If we are not committed, must decide if we should aggregate or not. + if (_aggregating == null) + _aggregating = shouldBuffer(_channel, last); + + // If we are not aggregating, then handle normally. + if (!_aggregating) + { + getNextInterceptor().write(content, last, callback); + return; + } + + if (LOG.isDebugEnabled()) + LOG.debug("{} aggregating", this); + + try + { + if (BufferUtil.hasContent(content)) + aggregate(content); + } + catch (Throwable t) + { + dispose(); + callback.failed(t); + return; + } + + if (last) + commit(callback); + else + callback.succeeded(); + } + + private void aggregate(ByteBuffer content) throws IOException + { + if (_fileOutputStream == null) + { + // Create a new OutputStream to a file. + _filePath = Files.createTempFile(_tempDir, "BufferedResponse", ""); + _fileOutputStream = Files.newOutputStream(_filePath, StandardOpenOption.WRITE); + } + + BufferUtil.writeTo(content, _fileOutputStream); + } + + private void commit(Callback callback) + { + if (_fileOutputStream == null) + { + // We have no content to write, signal next interceptor that we are finished. + getNextInterceptor().write(BufferUtil.EMPTY_BUFFER, true, callback); + return; + } + + try + { + _fileOutputStream.close(); + _fileOutputStream = null; + } + catch (Throwable t) + { + dispose(); + callback.failed(t); + return; + } + + // Create an iterating callback to do the writing + IteratingCallback icb = new IteratingCallback() + { + private final long fileLength = _filePath.toFile().length(); + private long _pos = 0; + private boolean _last = false; + + @Override + protected Action process() throws Exception + { + if (_last) + return Action.SUCCEEDED; + + long len = Math.min(MAX_MAPPED_BUFFER_SIZE, fileLength - _pos); + _last = (_pos + len == fileLength); + ByteBuffer buffer = BufferUtil.toMappedBuffer(_filePath, _pos, len); + getNextInterceptor().write(buffer, _last, this); + _pos += len; + return Action.SCHEDULED; + } + + @Override + protected void onCompleteSuccess() + { + dispose(); + callback.succeeded(); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + dispose(); + callback.failed(cause); + } + }; + icb.iterate(); + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/FileBufferedResponseHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/FileBufferedResponseHandlerTest.java new file mode 100644 index 00000000000..c6bb92f8b3f --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/FileBufferedResponseHandlerTest.java @@ -0,0 +1,609 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.FileBufferedResponseHandler; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.toolchain.test.FS; +import org.eclipse.jetty.toolchain.test.MavenTestingUtils; +import org.eclipse.jetty.util.Callback; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FileBufferedResponseHandlerTest +{ + private static final Logger LOG = LoggerFactory.getLogger(FileBufferedResponseHandlerTest.class); + + private Server _server; + private LocalConnector _localConnector; + private ServerConnector _serverConnector; + private Path _testDir; + private FileBufferedResponseHandler _bufferedHandler; + + @BeforeEach + public void before() throws Exception + { + _testDir = MavenTestingUtils.getTargetTestingPath(FileBufferedResponseHandlerTest.class.getName()); + FS.ensureDirExists(_testDir); + + _server = new Server(); + HttpConfiguration config = new HttpConfiguration(); + config.setOutputBufferSize(1024); + config.setOutputAggregationSize(256); + + _localConnector = new LocalConnector(_server, new HttpConnectionFactory(config)); + _localConnector.setIdleTimeout(Duration.ofMinutes(1).toMillis()); + _server.addConnector(_localConnector); + _serverConnector = new ServerConnector(_server, new HttpConnectionFactory(config)); + _server.addConnector(_serverConnector); + + _bufferedHandler = new FileBufferedResponseHandler(); + _bufferedHandler.setTempDir(_testDir); + _bufferedHandler.getPathIncludeExclude().include("/include/*"); + _bufferedHandler.getPathIncludeExclude().exclude("*.exclude"); + _bufferedHandler.getMimeIncludeExclude().exclude("text/excluded"); + _server.setHandler(_bufferedHandler); + + FS.ensureEmpty(_testDir); + } + + @AfterEach + public void after() throws Exception + { + _server.stop(); + } + + @Test + public void testPathNotIncluded() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was committed after the first write and we never created a file to buffer the response into. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("Committed: true")); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testIncludedByPath() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was not committed after the first write and a file was created to buffer the response. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("Committed: false")); + assertThat(responseContent, containsString("NumFiles: 1")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testExcludedByPath() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path.exclude HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was committed after the first write and we never created a file to buffer the response into. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("Committed: true")); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testExcludedByMime() throws Exception + { + String excludedMimeType = "text/excluded"; + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setContentType(excludedMimeType); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was committed after the first write and we never created a file to buffer the response into. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("Committed: true")); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testFlushed() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(1024); + PrintWriter writer = response.getWriter(); + writer.println("a string smaller than the buffer size"); + writer.println("NumFilesBeforeFlush: " + getNumFiles()); + writer.flush(); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The response was not committed after the buffer was flushed and a file was created to buffer the response. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("NumFilesBeforeFlush: 0")); + assertThat(responseContent, containsString("Committed: false")); + assertThat(responseContent, containsString("NumFiles: 1")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testClosed() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(10); + PrintWriter writer = response.getWriter(); + writer.println("a string larger than the buffer size"); + writer.println("NumFiles: " + getNumFiles()); + writer.close(); + writer.println("writtenAfterClose"); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The content written after close was not sent. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, not(containsString("writtenAfterClose"))); + assertThat(responseContent, containsString("NumFiles: 1")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testBufferSizeBig() throws Exception + { + int bufferSize = 4096; + String largeContent = generateContent(bufferSize - 64); + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(bufferSize); + PrintWriter writer = response.getWriter(); + writer.println(largeContent); + writer.println("Committed: " + response.isCommitted()); + writer.println("NumFiles: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The content written was not buffered as a file as it was less than the buffer size. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, not(containsString("writtenAfterClose"))); + assertThat(responseContent, containsString("Committed: false")); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testFlushEmpty() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(1024); + PrintWriter writer = response.getWriter(); + writer.flush(); + int numFiles = getNumFiles(); + writer.println("NumFiles: " + numFiles); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // The flush should not create the file unless there is content to write. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, containsString("NumFiles: 0")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testReset() throws Exception + { + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + response.setBufferSize(8); + PrintWriter writer = response.getWriter(); + writer.println("THIS WILL BE RESET"); + writer.flush(); + writer.println("THIS WILL BE RESET"); + int numFilesBeforeReset = getNumFiles(); + response.resetBuffer(); + int numFilesAfterReset = getNumFiles(); + + writer.println("NumFilesBeforeReset: " + numFilesBeforeReset); + writer.println("NumFilesAfterReset: " + numFilesAfterReset); + writer.println("a string larger than the buffer size"); + writer.println("NumFilesAfterWrite: " + getNumFiles()); + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + String responseContent = response.getContent(); + + // Resetting the response buffer will delete the file. + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + assertThat(responseContent, not(containsString("THIS WILL BE RESET"))); + assertThat(responseContent, containsString("NumFilesBeforeReset: 1")); + assertThat(responseContent, containsString("NumFilesAfterReset: 0")); + assertThat(responseContent, containsString("NumFilesAfterWrite: 1")); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testFileLargerThanMaxInteger() throws Exception + { + long fileSize = Integer.MAX_VALUE + 1234L; + byte[] bytes = randomBytes(1024 * 1024); + + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + ServletOutputStream outputStream = response.getOutputStream(); + + long written = 0; + while (written < fileSize) + { + int length = Math.toIntExact(Math.min(bytes.length, fileSize - written)); + outputStream.write(bytes, 0, length); + written += length; + } + outputStream.flush(); + + response.setHeader("NumFiles", Integer.toString(getNumFiles())); + response.setHeader("FileSize", Long.toString(getFileSize())); + } + }); + + _server.start(); + + AtomicLong received = new AtomicLong(); + HttpTester.Response response = new HttpTester.Response() + { + @Override + public boolean content(ByteBuffer ref) + { + // Verify the content is what was sent. + while (ref.hasRemaining()) + { + byte byteFromBuffer = ref.get(); + long totalReceived = received.getAndIncrement(); + int bytesIndex = (int)(totalReceived % bytes.length); + byte byteFromArray = bytes[bytesIndex]; + + if (byteFromBuffer != byteFromArray) + { + LOG.warn("Mismatch at index {} received bytes {}, {}!={}", bytesIndex, totalReceived, byteFromBuffer, byteFromArray, new IllegalStateException()); + return true; + } + } + + return false; + } + }; + + try (Socket socket = new Socket("localhost", _serverConnector.getLocalPort())) + { + OutputStream output = socket.getOutputStream(); + String request = "GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"; + output.write(request.getBytes(StandardCharsets.UTF_8)); + output.flush(); + + HttpTester.Input input = HttpTester.from(socket.getInputStream()); + HttpTester.parseResponse(input, response); + } + + assertTrue(response.isComplete()); + assertThat(response.get("NumFiles"), is("1")); + assertThat(response.get("FileSize"), is(Long.toString(fileSize))); + assertThat(received.get(), is(fileSize)); + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testNextInterceptorFailed() throws Exception + { + AbstractHandler failingInterceptorHandler = new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + HttpOutput httpOutput = baseRequest.getResponse().getHttpOutput(); + HttpOutput.Interceptor nextInterceptor = httpOutput.getInterceptor(); + httpOutput.setInterceptor(new HttpOutput.Interceptor() + { + @Override + public void write(ByteBuffer content, boolean last, Callback callback) + { + callback.failed(new Throwable("intentionally throwing from interceptor")); + } + + @Override + public HttpOutput.Interceptor getNextInterceptor() + { + return nextInterceptor; + } + }); + } + }; + + _server.setHandler(new HandlerCollection(failingInterceptorHandler, _server.getHandler())); + CompletableFuture errorFuture = new CompletableFuture<>(); + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + byte[] chunk1 = "this content will ".getBytes(); + byte[] chunk2 = "be buffered in a file".getBytes(); + response.setContentLength(chunk1.length + chunk2.length); + ServletOutputStream outputStream = response.getOutputStream(); + + // Write chunk1 and then flush so it is written to the file. + outputStream.write(chunk1); + outputStream.flush(); + assertThat(getNumFiles(), is(1)); + + try + { + // ContentLength is set so it knows this is the last write. + // This will cause the file to be written to the next interceptor which will fail. + outputStream.write(chunk2); + } + catch (Throwable t) + { + errorFuture.complete(t); + throw t; + } + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + + // Response was aborted. + assertThat(response.getStatus(), is(0)); + + // We failed because of the next interceptor. + Throwable error = errorFuture.get(5, TimeUnit.SECONDS); + assertThat(error.getMessage(), containsString("intentionally throwing from interceptor")); + + // All files were deleted. + assertThat(getNumFiles(), is(0)); + } + + @Test + public void testFileWriteFailed() throws Exception + { + // Set the temp directory to an empty directory so that the file cannot be created. + File tempDir = MavenTestingUtils.getTargetTestingDir(getClass().getSimpleName()); + FS.ensureDeleted(tempDir); + _bufferedHandler.setTempDir(tempDir.toPath()); + + CompletableFuture errorFuture = new CompletableFuture<>(); + _bufferedHandler.setHandler(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + baseRequest.setHandled(true); + ServletOutputStream outputStream = response.getOutputStream(); + byte[] content = "this content will be buffered in a file".getBytes(); + + try + { + // Write the content and flush it to the file. + // This should throw as it cannot create the file to aggregate into. + outputStream.write(content); + outputStream.flush(); + } + catch (Throwable t) + { + errorFuture.complete(t); + throw t; + } + } + }); + + _server.start(); + String rawResponse = _localConnector.getResponse("GET /include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); + HttpTester.Response response = HttpTester.parseResponse(rawResponse); + + // Response was aborted. + assertThat(response.getStatus(), is(0)); + + // We failed because cannot create the file. + Throwable error = errorFuture.get(5, TimeUnit.SECONDS); + assertThat(error, instanceOf(NoSuchFileException.class)); + + // No files were created. + assertThat(getNumFiles(), is(0)); + } + + private int getNumFiles() + { + File[] files = _testDir.toFile().listFiles(); + if (files == null) + return 0; + + return files.length; + } + + private long getFileSize() + { + File[] files = _testDir.toFile().listFiles(); + assertNotNull(files); + assertThat(files.length, is(1)); + return files[0].length(); + } + + private static String generateContent(int size) + { + Random random = new Random(); + StringBuilder stringBuilder = new StringBuilder(size); + for (int i = 0; i < size; i++) + { + stringBuilder.append((char)Math.abs(random.nextInt(0x7F))); + } + return stringBuilder.toString(); + } + + @SuppressWarnings("SameParameterValue") + private byte[] randomBytes(int size) + { + byte[] data = new byte[size]; + new Random().nextBytes(data); + return data; + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index 0292223d1e1..227906aa53c 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -25,6 +25,7 @@ import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Arrays; @@ -1032,9 +1033,14 @@ public class BufferUtil public static ByteBuffer toMappedBuffer(File file) throws IOException { - try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) + return toMappedBuffer(file.toPath(), 0, file.length()); + } + + public static ByteBuffer toMappedBuffer(Path filePath, long pos, long len) throws IOException + { + try (FileChannel channel = FileChannel.open(filePath, StandardOpenOption.READ)) { - return channel.map(MapMode.READ_ONLY, 0, file.length()); + return channel.map(MapMode.READ_ONLY, pos, len); } }