Merged branch 'jetty-9.2.x' into 'master'.
This commit is contained in:
commit
4a9d9eae94
|
@ -25,9 +25,9 @@
|
|||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.toolchain</groupId>
|
||||
<artifactId>jetty-test-helper</artifactId>
|
||||
<scope>test</scope>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>javax.servlet-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
|
@ -35,6 +35,11 @@
|
|||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-client</artifactId>
|
||||
|
@ -42,13 +47,14 @@
|
|||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
<artifactId>jetty-util-ajax</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>javax.servlet-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
<groupId>org.eclipse.jetty.toolchain</groupId>
|
||||
<artifactId>jetty-test-helper</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,461 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.proxy;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.nio.file.attribute.FileAttribute;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.jetty.util.component.Destroyable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>A specialized transformer for {@link AsyncMiddleManServlet} that performs
|
||||
* the transformation when the whole content has been received.</p>
|
||||
* <p>The content is buffered in memory up to a configurable {@link #getMaxInputBufferSize() maximum size},
|
||||
* after which it is overflown to a file on disk. The overflow file is saved
|
||||
* in the {@link #getOverflowDirectory() overflow directory} as a
|
||||
* {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file}
|
||||
* with a name starting with the {@link #getInputFilePrefix() input prefix}
|
||||
* and default suffix.</p>
|
||||
* <p>Application must implement the {@link #transform(Source, Sink) transformation method}
|
||||
* to transform the content.</p>
|
||||
* <p>The transformed content is buffered in memory up to a configurable {@link #getMaxOutputBufferSize() maximum size}
|
||||
* after which it is overflown to a file on disk. The overflow file is saved
|
||||
* in the {@link #getOverflowDirectory() overflow directory} as a
|
||||
* {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file}
|
||||
* with a name starting with the {@link #getOutputFilePrefix()} output prefix}
|
||||
* and default suffix.</p>
|
||||
*/
|
||||
public abstract class AfterContentTransformer implements AsyncMiddleManServlet.ContentTransformer, Destroyable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(AfterContentTransformer.class);
|
||||
|
||||
private final List<ByteBuffer> buffers = new ArrayList<>();
|
||||
private Path overflowDirectory = Paths.get(System.getProperty("java.io.tmpdir"));
|
||||
private String inputFilePrefix = "amms_adct_in_";
|
||||
private String outputFilePrefix = "amms_adct_out_";
|
||||
private long maxInputBufferSize = 1024 * 1024;
|
||||
private long inputBufferSize;
|
||||
private FileChannel inputFile;
|
||||
private long maxOutputBufferSize = maxInputBufferSize;
|
||||
private long outputBufferSize;
|
||||
private FileChannel outputFile;
|
||||
|
||||
/**
|
||||
* <p>Returns the directory where input and output are overflown to
|
||||
* temporary files if they exceed, respectively, the
|
||||
* {@link #getMaxInputBufferSize() max input size} or the
|
||||
* {@link #getMaxOutputBufferSize() max output size}.</p>
|
||||
* <p>Defaults to the directory pointed by the {@code java.io.tmpdir}
|
||||
* system property.</p>
|
||||
*
|
||||
* @return the overflow directory path
|
||||
* @see #setOverflowDirectory(Path)
|
||||
*/
|
||||
public Path getOverflowDirectory()
|
||||
{
|
||||
return overflowDirectory;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param overflowDirectory the overflow directory path
|
||||
* @see #getOverflowDirectory()
|
||||
*/
|
||||
public void setOverflowDirectory(Path overflowDirectory)
|
||||
{
|
||||
this.overflowDirectory = overflowDirectory;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the prefix of the input overflow temporary files
|
||||
* @see #setInputFilePrefix(String)
|
||||
*/
|
||||
public String getInputFilePrefix()
|
||||
{
|
||||
return inputFilePrefix;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param inputFilePrefix the prefix of the input overflow temporary files
|
||||
* @see #getInputFilePrefix()
|
||||
*/
|
||||
public void setInputFilePrefix(String inputFilePrefix)
|
||||
{
|
||||
this.inputFilePrefix = inputFilePrefix;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Returns the maximum input buffer size, after which the input is overflown to disk.</p>
|
||||
* <p>Defaults to 1 MiB, i.e. 1048576 bytes.</p>
|
||||
*
|
||||
* @return the max input buffer size
|
||||
* @see #setMaxInputBufferSize(long)
|
||||
*/
|
||||
public long getMaxInputBufferSize()
|
||||
{
|
||||
return maxInputBufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxInputBufferSize the max input buffer size
|
||||
* @see #getMaxInputBufferSize()
|
||||
*/
|
||||
public void setMaxInputBufferSize(long maxInputBufferSize)
|
||||
{
|
||||
this.maxInputBufferSize = maxInputBufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the prefix of the output overflow temporary files
|
||||
* @see #setOutputFilePrefix(String)
|
||||
*/
|
||||
public String getOutputFilePrefix()
|
||||
{
|
||||
return outputFilePrefix;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param outputFilePrefix the prefix of the output overflow temporary files
|
||||
* @see #getOutputFilePrefix()
|
||||
*/
|
||||
public void setOutputFilePrefix(String outputFilePrefix)
|
||||
{
|
||||
this.outputFilePrefix = outputFilePrefix;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Returns the maximum output buffer size, after which the output is overflown to disk.</p>
|
||||
* <p>Defaults to 1 MiB, i.e. 1048576 bytes.</p>
|
||||
*
|
||||
* @return the max output buffer size
|
||||
* @see #setMaxOutputBufferSize(long)
|
||||
*/
|
||||
public long getMaxOutputBufferSize()
|
||||
{
|
||||
return maxOutputBufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxOutputBufferSize the max output buffer size
|
||||
*/
|
||||
public void setMaxOutputBufferSize(long maxOutputBufferSize)
|
||||
{
|
||||
this.maxOutputBufferSize = maxOutputBufferSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
|
||||
{
|
||||
int remaining = input.remaining();
|
||||
if (remaining > 0)
|
||||
{
|
||||
inputBufferSize += remaining;
|
||||
long max = getMaxInputBufferSize();
|
||||
if (max >= 0 && inputBufferSize > max)
|
||||
{
|
||||
overflow(input);
|
||||
}
|
||||
else
|
||||
{
|
||||
ByteBuffer copy = ByteBuffer.allocate(input.remaining());
|
||||
copy.put(input).flip();
|
||||
buffers.add(copy);
|
||||
}
|
||||
}
|
||||
|
||||
if (finished)
|
||||
{
|
||||
Source source = new Source();
|
||||
Sink sink = new Sink();
|
||||
transform(source, sink);
|
||||
sink.drainTo(output);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Transforms the original content read from the {@code source} into
|
||||
* transformed content written to the {@code sink}.</p>
|
||||
* <p>The transformation must happen synchronously in the context of a call
|
||||
* to this method (it is not supported to perform the transformation in another
|
||||
* thread spawned during the call to this method).</p>
|
||||
* <p>Differently from {@link #transform(ByteBuffer, boolean, List)}, this
|
||||
* method is invoked only when the whole content is available, and offers
|
||||
* a blocking API via the InputStream and OutputStream that can be obtained
|
||||
* from {@link Source} and {@link Sink} respectively.</p>
|
||||
* <p>Typical implementations:</p>
|
||||
* <pre>
|
||||
* // Identity transformation (no transformation, the input is copied to the output)
|
||||
* public void transform(Source source, Sink sink)
|
||||
* {
|
||||
* org.eclipse.jetty.util.IO.copy(source.getInputStream(), sink.getOutputStream());
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @param source where the original content is read
|
||||
* @param sink where the transformed content is written
|
||||
* @throws IOException if the transformation fails
|
||||
*/
|
||||
public abstract void transform(Source source, Sink sink) throws IOException;
|
||||
|
||||
private void overflow(ByteBuffer input) throws IOException
|
||||
{
|
||||
if (inputFile == null)
|
||||
{
|
||||
Path path = Files.createTempFile(getOverflowDirectory(), getInputFilePrefix(), null);
|
||||
inputFile = FileChannel.open(path,
|
||||
StandardOpenOption.CREATE,
|
||||
StandardOpenOption.READ,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.DELETE_ON_CLOSE);
|
||||
int size = buffers.size();
|
||||
if (size > 0)
|
||||
{
|
||||
inputFile.write(buffers.toArray(new ByteBuffer[size]));
|
||||
buffers.clear();
|
||||
}
|
||||
}
|
||||
inputFile.write(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy()
|
||||
{
|
||||
close(inputFile);
|
||||
close(outputFile);
|
||||
}
|
||||
|
||||
private void close(Closeable closeable)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (closeable != null)
|
||||
closeable.close();
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
LOG.ignore(x);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The source from where the original content is read to be transformed.</p>
|
||||
* <p>The {@link #getInputStream() input stream} provided by this
|
||||
* class supports the {@link InputStream#reset()} method so that
|
||||
* the stream can be rewound to the beginning.</p>
|
||||
*/
|
||||
public class Source
|
||||
{
|
||||
private final InputStream stream;
|
||||
|
||||
private Source() throws IOException
|
||||
{
|
||||
if (inputFile != null)
|
||||
{
|
||||
inputFile.force(true);
|
||||
stream = new ChannelInputStream();
|
||||
}
|
||||
else
|
||||
{
|
||||
stream = new MemoryInputStream();
|
||||
}
|
||||
stream.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return an input stream to read the original content from
|
||||
*/
|
||||
public InputStream getInputStream()
|
||||
{
|
||||
return stream;
|
||||
}
|
||||
}
|
||||
|
||||
private class ChannelInputStream extends InputStream
|
||||
{
|
||||
private final InputStream stream = Channels.newInputStream(inputFile);
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException
|
||||
{
|
||||
return stream.read(b, off, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException
|
||||
{
|
||||
return stream.read();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException
|
||||
{
|
||||
inputFile.position(0);
|
||||
}
|
||||
}
|
||||
|
||||
private class MemoryInputStream extends InputStream
|
||||
{
|
||||
private final byte[] oneByte = new byte[1];
|
||||
private int index;
|
||||
private ByteBuffer slice;
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException
|
||||
{
|
||||
if (len == 0)
|
||||
return 0;
|
||||
if (index == buffers.size())
|
||||
return -1;
|
||||
|
||||
if (slice == null)
|
||||
slice = buffers.get(index).slice();
|
||||
|
||||
int size = Math.min(len, slice.remaining());
|
||||
slice.get(b, off, size);
|
||||
|
||||
if (!slice.hasRemaining())
|
||||
{
|
||||
++index;
|
||||
slice = null;
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException
|
||||
{
|
||||
int read = read(oneByte, 0, 1);
|
||||
return read < 0 ? read : oneByte[0] & 0xFF;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException
|
||||
{
|
||||
index = 0;
|
||||
slice = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The target to where the transformed content is written after the transformation.</p>
|
||||
*/
|
||||
public class Sink
|
||||
{
|
||||
private final List<ByteBuffer> buffers = new ArrayList<>();
|
||||
private final OutputStream stream = new SinkOutputStream();
|
||||
|
||||
/**
|
||||
* @return an output stream to write the transformed content to
|
||||
*/
|
||||
public OutputStream getOutputStream()
|
||||
{
|
||||
return stream;
|
||||
}
|
||||
|
||||
private void overflow(ByteBuffer output) throws IOException
|
||||
{
|
||||
if (outputFile == null)
|
||||
{
|
||||
Path path = Files.createTempFile(getOverflowDirectory(), getOutputFilePrefix(), null);
|
||||
outputFile = FileChannel.open(path,
|
||||
StandardOpenOption.CREATE,
|
||||
StandardOpenOption.READ,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.DELETE_ON_CLOSE);
|
||||
int size = buffers.size();
|
||||
if (size > 0)
|
||||
{
|
||||
outputFile.write(buffers.toArray(new ByteBuffer[size]));
|
||||
buffers.clear();
|
||||
}
|
||||
}
|
||||
outputFile.write(output);
|
||||
}
|
||||
|
||||
private void drainTo(List<ByteBuffer> output) throws IOException
|
||||
{
|
||||
if (outputFile == null)
|
||||
{
|
||||
output.addAll(buffers);
|
||||
buffers.clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
outputFile.force(true);
|
||||
long position = 0;
|
||||
long length = outputFile.size();
|
||||
outputFile.position(position);
|
||||
while (length > 0)
|
||||
{
|
||||
// At most 1 GiB file maps.
|
||||
long size = Math.min(1024 * 1024 * 1024, length);
|
||||
ByteBuffer buffer = outputFile.map(FileChannel.MapMode.READ_ONLY, position, size);
|
||||
output.add(buffer);
|
||||
position += size;
|
||||
length -= size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class SinkOutputStream extends OutputStream
|
||||
{
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException
|
||||
{
|
||||
if (len <= 0)
|
||||
return;
|
||||
|
||||
outputBufferSize += len;
|
||||
long max = getMaxOutputBufferSize();
|
||||
if (max >= 0 && outputBufferSize > max)
|
||||
{
|
||||
overflow(ByteBuffer.wrap(b, off, len));
|
||||
}
|
||||
else
|
||||
{
|
||||
// The array may be reused by the
|
||||
// application so we need to copy it.
|
||||
byte[] copy = new byte[len];
|
||||
System.arraycopy(b, off, copy, 0, len);
|
||||
buffers.add(ByteBuffer.wrap(copy));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException
|
||||
{
|
||||
write(new byte[]{(byte)b}, 0, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -52,6 +52,7 @@ import org.eclipse.jetty.util.BufferUtil;
|
|||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.CountingCallback;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.component.Destroyable;
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public class AsyncMiddleManServlet extends AbstractProxyServlet
|
||||
|
@ -142,6 +143,19 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
|||
return ContentTransformer.IDENTITY;
|
||||
}
|
||||
|
||||
private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
transformer.transform(input, finished, output);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
_log.info("Exception while transforming " + transformer, x);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
||||
int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException
|
||||
{
|
||||
return input.read(buffer);
|
||||
|
@ -170,6 +184,16 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
|||
output.write(buffer, offset, length);
|
||||
}
|
||||
|
||||
private void cleanup(HttpServletRequest clientRequest)
|
||||
{
|
||||
ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
|
||||
if (clientTransformer instanceof Destroyable)
|
||||
((Destroyable)clientTransformer).destroy();
|
||||
ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
|
||||
if (serverTransformer instanceof Destroyable)
|
||||
((Destroyable)serverTransformer).destroy();
|
||||
}
|
||||
|
||||
protected class ProxyReader extends IteratingCallback implements ReadListener
|
||||
{
|
||||
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
|
||||
|
@ -218,6 +242,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
|||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
cleanup(clientRequest);
|
||||
onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, t);
|
||||
}
|
||||
|
||||
|
@ -277,7 +302,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
|||
}
|
||||
|
||||
int contentBytes = content.remaining();
|
||||
transformer.transform(content, finished, buffers);
|
||||
transform(transformer, content, finished, buffers);
|
||||
|
||||
int newContentBytes = 0;
|
||||
int size = buffers.size();
|
||||
|
@ -377,7 +402,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
|||
length += contentBytes;
|
||||
|
||||
boolean finished = contentLength > 0 && length == contentLength;
|
||||
transformer.transform(content, finished, buffers);
|
||||
transform(transformer, content, finished, buffers);
|
||||
|
||||
int newContentBytes = 0;
|
||||
int size = buffers.size();
|
||||
|
@ -441,7 +466,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
|||
ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
|
||||
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
|
||||
|
||||
transformer.transform(BufferUtil.EMPTY_BUFFER, true, buffers);
|
||||
transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers);
|
||||
|
||||
long newContentBytes = 0;
|
||||
int size = buffers.size();
|
||||
|
@ -491,12 +516,14 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
|||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
cleanup(clientRequest);
|
||||
onProxyResponseSuccess(clientRequest, proxyResponse, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable failure)
|
||||
{
|
||||
cleanup(clientRequest);
|
||||
onProxyResponseFailure(clientRequest, proxyResponse, response, failure);
|
||||
}
|
||||
}
|
||||
|
@ -723,4 +750,5 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
|
|||
return ByteBuffer.wrap(gzipBytes);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,11 +20,16 @@ package org.eclipse.jetty.proxy;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URLDecoder;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -54,6 +59,8 @@ import org.eclipse.jetty.client.util.BytesContentProvider;
|
|||
import org.eclipse.jetty.client.util.DeferredContentProvider;
|
||||
import org.eclipse.jetty.client.util.FutureResponseListener;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.io.RuntimeIOException;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.HttpConnectionFactory;
|
||||
|
@ -61,10 +68,12 @@ import org.eclipse.jetty.server.Server;
|
|||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.toolchain.test.IO;
|
||||
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||
import org.eclipse.jetty.toolchain.test.TestTracker;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.Utf8StringBuilder;
|
||||
import org.eclipse.jetty.util.ajax.JSON;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
|
@ -289,7 +298,7 @@ public class AsyncMiddleManServletTest
|
|||
Assert.assertEquals(200, response.getStatus());
|
||||
Assert.assertArrayEquals(bytes, response.getContent());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTransformGzippedHead() throws Exception
|
||||
{
|
||||
|
@ -299,21 +308,21 @@ public class AsyncMiddleManServletTest
|
|||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
|
||||
|
||||
|
||||
String sample = "<a href=\"http://webtide.com/\">Webtide</a>\n<a href=\"http://google.com\">Google</a>\n";
|
||||
byte[] bytes = sample.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
|
||||
ServletOutputStream out = response.getOutputStream();
|
||||
out.write(gzip(bytes));
|
||||
|
||||
|
||||
// create a byte buffer larger enough to create 2 (or more) transforms.
|
||||
byte[] randomFiller = new byte[64*1024];
|
||||
byte[] randomFiller = new byte[64 * 1024];
|
||||
/* fill with nonsense
|
||||
* Using random data to ensure compressed buffer size is large
|
||||
* enough to trigger at least 2 transform() events.
|
||||
*/
|
||||
new Random().nextBytes(randomFiller);
|
||||
|
||||
|
||||
out.write(gzip(randomFiller));
|
||||
}
|
||||
});
|
||||
|
@ -333,7 +342,7 @@ public class AsyncMiddleManServletTest
|
|||
.send();
|
||||
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
|
||||
|
||||
String expectedStr = "<a href=\"http://webtide.com/\">Webtide</a>";
|
||||
byte[] expected = expectedStr.getBytes(StandardCharsets.UTF_8);
|
||||
Assert.assertArrayEquals(expected, response.getContent());
|
||||
|
@ -436,7 +445,7 @@ public class AsyncMiddleManServletTest
|
|||
{
|
||||
// decode input stream thru gzip
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
IO.copy(new GZIPInputStream(request.getInputStream()),bos);
|
||||
IO.copy(new GZIPInputStream(request.getInputStream()), bos);
|
||||
// ensure decompressed is 0 length
|
||||
Assert.assertEquals(0, bos.toByteArray().length);
|
||||
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
|
||||
|
@ -859,6 +868,340 @@ public class AsyncMiddleManServletTest
|
|||
Assert.assertEquals(502, response.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAfterContentTransformer() throws Exception
|
||||
{
|
||||
final String key0 = "id";
|
||||
long value0 = 1;
|
||||
final String key1 = "channel";
|
||||
String value1 = "foo";
|
||||
final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }";
|
||||
startServer(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
});
|
||||
final String key2 = "c";
|
||||
startProxy(new AsyncMiddleManServlet()
|
||||
{
|
||||
@Override
|
||||
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
|
||||
{
|
||||
return new AfterContentTransformer()
|
||||
{
|
||||
@Override
|
||||
public void transform(Source source, Sink sink) throws IOException
|
||||
{
|
||||
InputStream input = source.getInputStream();
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, "UTF-8"));
|
||||
// Transform the object.
|
||||
obj.put(key2, obj.remove(key1));
|
||||
try (OutputStream output = sink.getOutputStream())
|
||||
{
|
||||
output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> obj = (Map<String, Object>)JSON.parse(response.getContentAsString());
|
||||
Assert.assertNotNull(obj);
|
||||
Assert.assertEquals(2, obj.size());
|
||||
Assert.assertEquals(value0, obj.get(key0));
|
||||
Assert.assertEquals(value1, obj.get(key2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAfterContentTransformerMemoryInputStreamReset() throws Exception
|
||||
{
|
||||
testAfterContentTransformerInputStreamReset(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAfterContentTransformerDiskInputStreamReset() throws Exception
|
||||
{
|
||||
testAfterContentTransformerInputStreamReset(true);
|
||||
}
|
||||
|
||||
private void testAfterContentTransformerInputStreamReset(final boolean overflow) throws Exception
|
||||
{
|
||||
final byte[] data = new byte[]{'c', 'o', 'f', 'f', 'e', 'e'};
|
||||
startServer(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
// Write the content in two chunks.
|
||||
int chunk = data.length / 2;
|
||||
ServletOutputStream output = response.getOutputStream();
|
||||
output.write(data, 0, chunk);
|
||||
sleep(1000);
|
||||
output.write(data, chunk, data.length - chunk);
|
||||
}
|
||||
});
|
||||
startProxy(new AsyncMiddleManServlet()
|
||||
{
|
||||
@Override
|
||||
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
|
||||
{
|
||||
return new AfterContentTransformer()
|
||||
{
|
||||
{
|
||||
setMaxInputBufferSize(overflow ? data.length / 2 : data.length * 2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(Source source, Sink sink) throws IOException
|
||||
{
|
||||
// Consume the stream once.
|
||||
InputStream input = source.getInputStream();
|
||||
IO.copy(input, IO.getNullStream());
|
||||
|
||||
// Reset the stream and re-read it.
|
||||
input.reset();
|
||||
IO.copy(input, sink.getOutputStream());
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
Assert.assertArrayEquals(data, response.getContent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAfterContentTransformerOverflowingToDisk() throws Exception
|
||||
{
|
||||
// Make sure the temporary directory we use exists and it's empty.
|
||||
final Path targetTestsDir = prepareTargetTestsDir();
|
||||
|
||||
final String key0 = "id";
|
||||
long value0 = 1;
|
||||
final String key1 = "channel";
|
||||
String value1 = "foo";
|
||||
final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }";
|
||||
startServer(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
});
|
||||
final String inputPrefix = "in_";
|
||||
final String outputPrefix = "out_";
|
||||
final String key2 = "c";
|
||||
startProxy(new AsyncMiddleManServlet()
|
||||
{
|
||||
@Override
|
||||
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
|
||||
{
|
||||
AfterContentTransformer transformer = new AfterContentTransformer()
|
||||
{
|
||||
@Override
|
||||
public void transform(Source source, Sink sink) throws IOException
|
||||
{
|
||||
InputStream input = source.getInputStream();
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, "UTF-8"));
|
||||
// Transform the object.
|
||||
obj.put(key2, obj.remove(key1));
|
||||
try (OutputStream output = sink.getOutputStream())
|
||||
{
|
||||
output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
};
|
||||
transformer.setOverflowDirectory(targetTestsDir);
|
||||
int maxBufferSize = json.length() / 4;
|
||||
transformer.setMaxInputBufferSize(maxBufferSize);
|
||||
transformer.setInputFilePrefix(inputPrefix);
|
||||
transformer.setMaxOutputBufferSize(maxBufferSize);
|
||||
transformer.setOutputFilePrefix(outputPrefix);
|
||||
return transformer;
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> obj = (Map<String, Object>)JSON.parse(response.getContentAsString());
|
||||
Assert.assertNotNull(obj);
|
||||
Assert.assertEquals(2, obj.size());
|
||||
Assert.assertEquals(value0, obj.get(key0));
|
||||
Assert.assertEquals(value1, obj.get(key2));
|
||||
// Make sure the files do not exist.
|
||||
try (DirectoryStream<Path> paths = Files.newDirectoryStream(targetTestsDir, inputPrefix + "*.*"))
|
||||
{
|
||||
Assert.assertFalse(paths.iterator().hasNext());
|
||||
}
|
||||
try (DirectoryStream<Path> paths = Files.newDirectoryStream(targetTestsDir, outputPrefix + "*.*"))
|
||||
{
|
||||
Assert.assertFalse(paths.iterator().hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAfterContentTransformerClosingFilesOnClientRequestException() throws Exception
|
||||
{
|
||||
final Path targetTestsDir = prepareTargetTestsDir();
|
||||
|
||||
startServer(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
IO.copy(request.getInputStream(), IO.getNullStream());
|
||||
}
|
||||
});
|
||||
final CountDownLatch destroyLatch = new CountDownLatch(1);
|
||||
startProxy(new AsyncMiddleManServlet()
|
||||
{
|
||||
@Override
|
||||
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
|
||||
{
|
||||
return new AfterContentTransformer()
|
||||
{
|
||||
{
|
||||
setOverflowDirectory(targetTestsDir);
|
||||
setMaxInputBufferSize(0);
|
||||
setMaxOutputBufferSize(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(Source source, Sink sink) throws IOException
|
||||
{
|
||||
IO.copy(source.getInputStream(), sink.getOutputStream());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy()
|
||||
{
|
||||
super.destroy();
|
||||
destroyLatch.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
long idleTimeout = 1000;
|
||||
proxyConnector.setIdleTimeout(idleTimeout);
|
||||
startClient();
|
||||
|
||||
// Send only part of the content; the proxy will idle timeout.
|
||||
final byte[] data = new byte[]{'c', 'a', 'f', 'e'};
|
||||
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
|
||||
.content(new BytesContentProvider(data)
|
||||
{
|
||||
@Override
|
||||
public long getLength()
|
||||
{
|
||||
return data.length + 1;
|
||||
}
|
||||
})
|
||||
.timeout(5 * idleTimeout, TimeUnit.MILLISECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertTrue(destroyLatch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
Assert.assertEquals(HttpStatus.GATEWAY_TIMEOUT_504, response.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAfterContentTransformerClosingFilesOnServerResponseException() throws Exception
|
||||
{
|
||||
final Path targetTestsDir = prepareTargetTestsDir();
|
||||
|
||||
final CountDownLatch serviceLatch = new CountDownLatch(1);
|
||||
startServer(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
|
||||
response.setContentLength(2);
|
||||
// Send only part of the content.
|
||||
OutputStream output = response.getOutputStream();
|
||||
output.write('x');
|
||||
output.flush();
|
||||
serviceLatch.countDown();
|
||||
}
|
||||
});
|
||||
final CountDownLatch destroyLatch = new CountDownLatch(1);
|
||||
startProxy(new AsyncMiddleManServlet()
|
||||
{
|
||||
@Override
|
||||
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
|
||||
{
|
||||
return new AfterContentTransformer()
|
||||
{
|
||||
{
|
||||
setOverflowDirectory(targetTestsDir);
|
||||
setMaxInputBufferSize(0);
|
||||
setMaxOutputBufferSize(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(Source source, Sink sink) throws IOException
|
||||
{
|
||||
IO.copy(source.getInputStream(), sink.getOutputStream());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy()
|
||||
{
|
||||
super.destroy();
|
||||
destroyLatch.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertTrue(serviceLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertTrue(destroyLatch.await(5, TimeUnit.SECONDS));
|
||||
Assert.assertEquals(HttpStatus.BAD_GATEWAY_502, response.getStatus());
|
||||
}
|
||||
|
||||
private Path prepareTargetTestsDir() throws IOException
|
||||
{
|
||||
final Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
|
||||
Files.createDirectories(targetTestsDir);
|
||||
try (DirectoryStream<Path> files = Files.newDirectoryStream(targetTestsDir, "*.*"))
|
||||
{
|
||||
for (Path file : files)
|
||||
{
|
||||
if (!Files.isDirectory(file))
|
||||
Files.delete(file);
|
||||
}
|
||||
}
|
||||
return targetTestsDir;
|
||||
}
|
||||
|
||||
private void sleep(long delay)
|
||||
{
|
||||
try
|
||||
|
@ -1027,7 +1370,7 @@ public class AsyncMiddleManServletTest
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A transformer that discards all but the first line of text.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue