464292 - Implement stream-based transformer for AsyncMiddleManServlet.

Introduced AfterContentTransformer with a transform(Source, Sink)
method that offers an InputStream to read the original content from,
and an OutputStream to write transformed content to.
This commit is contained in:
Simone Bordet 2015-04-09 15:15:27 +02:00
parent 990a045588
commit 4fbdafb9e9
4 changed files with 859 additions and 21 deletions

View File

@ -71,9 +71,9 @@
</build>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@ -81,6 +81,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
@ -88,13 +93,14 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<artifactId>jetty-util-ajax</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,461 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.proxy;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>A specialized transformer for {@link AsyncMiddleManServlet} that performs
* the transformation when the whole content has been received.</p>
* <p>The content is buffered in memory up to a configurable {@link #getMaxInputBufferSize() maximum size},
* after which it is overflown to a file on disk. The overflow file is saved
* in the {@link #getOverflowDirectory() overflow directory} as a
* {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file}
* with a name starting with the {@link #getInputFilePrefix() input prefix}
* and default suffix.</p>
* <p>Application must implement the {@link #transform(Source, Sink) transformation method}
* to transform the content.</p>
* <p>The transformed content is buffered in memory up to a configurable {@link #getMaxOutputBufferSize() maximum size}
* after which it is overflown to a file on disk. The overflow file is saved
* in the {@link #getOverflowDirectory() overflow directory} as a
* {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file}
* with a name starting with the {@link #getOutputFilePrefix()} output prefix}
* and default suffix.</p>
*/
public abstract class AfterContentTransformer implements AsyncMiddleManServlet.ContentTransformer, Destroyable
{
private static final Logger LOG = Log.getLogger(AfterContentTransformer.class);
private final List<ByteBuffer> buffers = new ArrayList<>();
private Path overflowDirectory = Paths.get(System.getProperty("java.io.tmpdir"));
private String inputFilePrefix = "amms_adct_in_";
private String outputFilePrefix = "amms_adct_out_";
private long maxInputBufferSize = 1024 * 1024;
private long inputBufferSize;
private FileChannel inputFile;
private long maxOutputBufferSize = maxInputBufferSize;
private long outputBufferSize;
private FileChannel outputFile;
/**
* <p>Returns the directory where input and output are overflown to
* temporary files if they exceed, respectively, the
* {@link #getMaxInputBufferSize() max input size} or the
* {@link #getMaxOutputBufferSize() max output size}.</p>
* <p>Defaults to the directory pointed by the {@code java.io.tmpdir}
* system property.</p>
*
* @return the overflow directory path
* @see #setOverflowDirectory(Path)
*/
public Path getOverflowDirectory()
{
return overflowDirectory;
}
/**
* @param overflowDirectory the overflow directory path
* @see #getOverflowDirectory()
*/
public void setOverflowDirectory(Path overflowDirectory)
{
this.overflowDirectory = overflowDirectory;
}
/**
* @return the prefix of the input overflow temporary files
* @see #setInputFilePrefix(String)
*/
public String getInputFilePrefix()
{
return inputFilePrefix;
}
/**
* @param inputFilePrefix the prefix of the input overflow temporary files
* @see #getInputFilePrefix()
*/
public void setInputFilePrefix(String inputFilePrefix)
{
this.inputFilePrefix = inputFilePrefix;
}
/**
* <p>Returns the maximum input buffer size, after which the input is overflown to disk.</p>
* <p>Defaults to 1 MiB, i.e. 1048576 bytes.</p>
*
* @return the max input buffer size
* @see #setMaxInputBufferSize(long)
*/
public long getMaxInputBufferSize()
{
return maxInputBufferSize;
}
/**
* @param maxInputBufferSize the max input buffer size
* @see #getMaxInputBufferSize()
*/
public void setMaxInputBufferSize(long maxInputBufferSize)
{
this.maxInputBufferSize = maxInputBufferSize;
}
/**
* @return the prefix of the output overflow temporary files
* @see #setOutputFilePrefix(String)
*/
public String getOutputFilePrefix()
{
return outputFilePrefix;
}
/**
* @param outputFilePrefix the prefix of the output overflow temporary files
* @see #getOutputFilePrefix()
*/
public void setOutputFilePrefix(String outputFilePrefix)
{
this.outputFilePrefix = outputFilePrefix;
}
/**
* <p>Returns the maximum output buffer size, after which the output is overflown to disk.</p>
* <p>Defaults to 1 MiB, i.e. 1048576 bytes.</p>
*
* @return the max output buffer size
* @see #setMaxOutputBufferSize(long)
*/
public long getMaxOutputBufferSize()
{
return maxOutputBufferSize;
}
/**
* @param maxOutputBufferSize the max output buffer size
*/
public void setMaxOutputBufferSize(long maxOutputBufferSize)
{
this.maxOutputBufferSize = maxOutputBufferSize;
}
@Override
public final void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
{
int remaining = input.remaining();
if (remaining > 0)
{
inputBufferSize += remaining;
long max = getMaxInputBufferSize();
if (max >= 0 && inputBufferSize > max)
{
overflow(input);
}
else
{
ByteBuffer copy = ByteBuffer.allocate(input.remaining());
copy.put(input).flip();
buffers.add(copy);
}
}
if (finished)
{
Source source = new Source();
Sink sink = new Sink();
transform(source, sink);
sink.drainTo(output);
}
}
/**
* <p>Transforms the original content read from the {@code source} into
* transformed content written to the {@code sink}.</p>
* <p>The transformation must happen synchronously in the context of a call
* to this method (it is not supported to perform the transformation in another
* thread spawned during the call to this method).</p>
* <p>Differently from {@link #transform(ByteBuffer, boolean, List)}, this
* method is invoked only when the whole content is available, and offers
* a blocking API via the InputStream and OutputStream that can be obtained
* from {@link Source} and {@link Sink} respectively.</p>
* <p>Typical implementations:</p>
* <pre>
* // Identity transformation (no transformation, the input is copied to the output)
* public void transform(Source source, Sink sink)
* {
* org.eclipse.jetty.util.IO.copy(source.getInputStream(), sink.getOutputStream());
* }
* </pre>
*
* @param source where the original content is read
* @param sink where the transformed content is written
* @throws IOException if the transformation fails
*/
public abstract void transform(Source source, Sink sink) throws IOException;
private void overflow(ByteBuffer input) throws IOException
{
if (inputFile == null)
{
Path path = Files.createTempFile(getOverflowDirectory(), getInputFilePrefix(), null);
inputFile = FileChannel.open(path,
StandardOpenOption.CREATE,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE);
int size = buffers.size();
if (size > 0)
{
inputFile.write(buffers.toArray(new ByteBuffer[size]));
buffers.clear();
}
}
inputFile.write(input);
}
@Override
public void destroy()
{
close(inputFile);
close(outputFile);
}
private void close(Closeable closeable)
{
try
{
if (closeable != null)
closeable.close();
}
catch (IOException x)
{
LOG.ignore(x);
}
}
/**
* <p>The source from where the original content is read to be transformed.</p>
* <p>The {@link #getInputStream() input stream} provided by this
* class supports the {@link InputStream#reset()} method so that
* the stream can be rewound to the beginning.</p>
*/
public class Source
{
private final InputStream stream;
private Source() throws IOException
{
if (inputFile != null)
{
inputFile.force(true);
stream = new ChannelInputStream();
}
else
{
stream = new MemoryInputStream();
}
stream.reset();
}
/**
* @return an input stream to read the original content from
*/
public InputStream getInputStream()
{
return stream;
}
}
private class ChannelInputStream extends InputStream
{
private final InputStream stream = Channels.newInputStream(inputFile);
@Override
public int read(byte[] b, int off, int len) throws IOException
{
return stream.read(b, off, len);
}
@Override
public int read() throws IOException
{
return stream.read();
}
@Override
public void reset() throws IOException
{
inputFile.position(0);
}
}
private class MemoryInputStream extends InputStream
{
private final byte[] oneByte = new byte[1];
private int index;
private ByteBuffer slice;
@Override
public int read(byte[] b, int off, int len) throws IOException
{
if (len == 0)
return 0;
if (index == buffers.size())
return -1;
if (slice == null)
slice = buffers.get(index).slice();
int size = Math.min(len, slice.remaining());
slice.get(b, off, size);
if (!slice.hasRemaining())
{
++index;
slice = null;
}
return size;
}
@Override
public int read() throws IOException
{
int read = read(oneByte, 0, 1);
return read < 0 ? read : oneByte[0] & 0xFF;
}
@Override
public void reset() throws IOException
{
index = 0;
slice = null;
}
}
/**
* <p>The target to where the transformed content is written after the transformation.</p>
*/
public class Sink
{
private final List<ByteBuffer> buffers = new ArrayList<>();
private final OutputStream stream = new SinkOutputStream();
/**
* @return an output stream to write the transformed content to
*/
public OutputStream getOutputStream()
{
return stream;
}
private void overflow(ByteBuffer output) throws IOException
{
if (outputFile == null)
{
Path path = Files.createTempFile(getOverflowDirectory(), getOutputFilePrefix(), null);
outputFile = FileChannel.open(path,
StandardOpenOption.CREATE,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE);
int size = buffers.size();
if (size > 0)
{
outputFile.write(buffers.toArray(new ByteBuffer[size]));
buffers.clear();
}
}
outputFile.write(output);
}
private void drainTo(List<ByteBuffer> output) throws IOException
{
if (outputFile == null)
{
output.addAll(buffers);
buffers.clear();
}
else
{
outputFile.force(true);
long position = 0;
long length = outputFile.size();
outputFile.position(position);
while (length > 0)
{
// At most 1 GiB file maps.
long size = Math.min(1024 * 1024 * 1024, length);
ByteBuffer buffer = outputFile.map(FileChannel.MapMode.READ_ONLY, position, size);
output.add(buffer);
position += size;
length -= size;
}
}
}
private class SinkOutputStream extends OutputStream
{
@Override
public void write(byte[] b, int off, int len) throws IOException
{
if (len <= 0)
return;
outputBufferSize += len;
long max = getMaxOutputBufferSize();
if (max >= 0 && outputBufferSize > max)
{
overflow(ByteBuffer.wrap(b, off, len));
}
else
{
// The array may be reused by the
// application so we need to copy it.
byte[] copy = new byte[len];
System.arraycopy(b, off, copy, 0, len);
buffers.add(ByteBuffer.wrap(copy));
}
}
@Override
public void write(int b) throws IOException
{
write(new byte[]{(byte)b}, 0, 1);
}
}
}
}

View File

@ -51,6 +51,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.Destroyable;
@SuppressWarnings("serial")
public class AsyncMiddleManServlet extends AbstractProxyServlet
@ -141,6 +142,19 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
return ContentTransformer.IDENTITY;
}
private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
{
try
{
transformer.transform(input, finished, output);
}
catch (Throwable x)
{
_log.info("Exception while transforming " + transformer, x);
throw x;
}
}
int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException
{
return input.read(buffer);
@ -169,6 +183,16 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
output.write(buffer, offset, length);
}
private void cleanup(HttpServletRequest clientRequest)
{
ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
if (clientTransformer instanceof Destroyable)
((Destroyable)clientTransformer).destroy();
ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
if (serverTransformer instanceof Destroyable)
((Destroyable)serverTransformer).destroy();
}
protected class ProxyReader extends IteratingCallback implements ReadListener
{
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
@ -217,6 +241,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
@Override
public void onError(Throwable t)
{
cleanup(clientRequest);
onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, t);
}
@ -272,7 +297,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
}
int contentBytes = content.remaining();
transformer.transform(content, finished, buffers);
transform(transformer, content, finished, buffers);
int newContentBytes = 0;
int size = buffers.size();
@ -372,7 +397,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
length += contentBytes;
boolean finished = contentLength > 0 && length == contentLength;
transformer.transform(content, finished, buffers);
transform(transformer, content, finished, buffers);
int newContentBytes = 0;
int size = buffers.size();
@ -436,7 +461,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
transformer.transform(BufferUtil.EMPTY_BUFFER, true, buffers);
transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers);
long newContentBytes = 0;
int size = buffers.size();
@ -486,12 +511,14 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
@Override
public void succeeded()
{
cleanup(clientRequest);
onProxyResponseSuccess(clientRequest, proxyResponse, response);
}
@Override
public void failed(Throwable failure)
{
cleanup(clientRequest);
onProxyResponseFailure(clientRequest, proxyResponse, response, failure);
}
}
@ -718,4 +745,5 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
return ByteBuffer.wrap(gzipBytes);
}
}
}

View File

@ -20,10 +20,16 @@ package org.eclipse.jetty.proxy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -34,7 +40,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
@ -53,6 +58,8 @@ import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@ -60,10 +67,12 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.toolchain.test.IO;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.ajax.JSON;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -289,7 +298,7 @@ public class AsyncMiddleManServletTest
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(bytes, response.getContent());
}
@Test
public void testTransformGzippedHead() throws Exception
{
@ -299,21 +308,21 @@ public class AsyncMiddleManServletTest
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
String sample = "<a href=\"http://webtide.com/\">Webtide</a>\n<a href=\"http://google.com\">Google</a>\n";
byte[] bytes = sample.getBytes(StandardCharsets.UTF_8);
ServletOutputStream out = response.getOutputStream();
out.write(gzip(bytes));
// create a byte buffer larger enough to create 2 (or more) transforms.
byte[] randomFiller = new byte[64*1024];
byte[] randomFiller = new byte[64 * 1024];
/* fill with nonsense
* Using random data to ensure compressed buffer size is large
* enough to trigger at least 2 transform() events.
*/
new Random().nextBytes(randomFiller);
out.write(gzip(randomFiller));
}
});
@ -333,7 +342,7 @@ public class AsyncMiddleManServletTest
.send();
Assert.assertEquals(200, response.getStatus());
String expectedStr = "<a href=\"http://webtide.com/\">Webtide</a>";
byte[] expected = expectedStr.getBytes(StandardCharsets.UTF_8);
Assert.assertArrayEquals(expected, response.getContent());
@ -436,7 +445,7 @@ public class AsyncMiddleManServletTest
{
// decode input stream thru gzip
ByteArrayOutputStream bos = new ByteArrayOutputStream();
IO.copy(new GZIPInputStream(request.getInputStream()),bos);
IO.copy(new GZIPInputStream(request.getInputStream()), bos);
// ensure decompressed is 0 length
Assert.assertEquals(0, bos.toByteArray().length);
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
@ -837,6 +846,340 @@ public class AsyncMiddleManServletTest
Assert.assertEquals(502, response.getStatus());
}
@Test
public void testAfterContentTransformer() throws Exception
{
final String key0 = "id";
long value0 = 1;
final String key1 = "channel";
String value1 = "foo";
final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }";
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
}
});
final String key2 = "c";
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
return new AfterContentTransformer()
{
@Override
public void transform(Source source, Sink sink) throws IOException
{
InputStream input = source.getInputStream();
@SuppressWarnings("unchecked")
Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, "UTF-8"));
// Transform the object.
obj.put(key2, obj.remove(key1));
try (OutputStream output = sink.getOutputStream())
{
output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
}
}
};
}
});
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
@SuppressWarnings("unchecked")
Map<String, Object> obj = (Map<String, Object>)JSON.parse(response.getContentAsString());
Assert.assertNotNull(obj);
Assert.assertEquals(2, obj.size());
Assert.assertEquals(value0, obj.get(key0));
Assert.assertEquals(value1, obj.get(key2));
}
@Test
public void testAfterContentTransformerMemoryInputStreamReset() throws Exception
{
testAfterContentTransformerInputStreamReset(false);
}
@Test
public void testAfterContentTransformerDiskInputStreamReset() throws Exception
{
testAfterContentTransformerInputStreamReset(true);
}
private void testAfterContentTransformerInputStreamReset(final boolean overflow) throws Exception
{
final byte[] data = new byte[]{'c', 'o', 'f', 'f', 'e', 'e'};
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
// Write the content in two chunks.
int chunk = data.length / 2;
ServletOutputStream output = response.getOutputStream();
output.write(data, 0, chunk);
sleep(1000);
output.write(data, chunk, data.length - chunk);
}
});
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
return new AfterContentTransformer()
{
{
setMaxInputBufferSize(overflow ? data.length / 2 : data.length * 2);
}
@Override
public void transform(Source source, Sink sink) throws IOException
{
// Consume the stream once.
InputStream input = source.getInputStream();
IO.copy(input, IO.getNullStream());
// Reset the stream and re-read it.
input.reset();
IO.copy(input, sink.getOutputStream());
}
};
}
});
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
@Test
public void testAfterContentTransformerOverflowingToDisk() throws Exception
{
// Make sure the temporary directory we use exists and it's empty.
final Path targetTestsDir = prepareTargetTestsDir();
final String key0 = "id";
long value0 = 1;
final String key1 = "channel";
String value1 = "foo";
final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }";
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
}
});
final String inputPrefix = "in_";
final String outputPrefix = "out_";
final String key2 = "c";
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
AfterContentTransformer transformer = new AfterContentTransformer()
{
@Override
public void transform(Source source, Sink sink) throws IOException
{
InputStream input = source.getInputStream();
@SuppressWarnings("unchecked")
Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, "UTF-8"));
// Transform the object.
obj.put(key2, obj.remove(key1));
try (OutputStream output = sink.getOutputStream())
{
output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
}
}
};
transformer.setOverflowDirectory(targetTestsDir);
int maxBufferSize = json.length() / 4;
transformer.setMaxInputBufferSize(maxBufferSize);
transformer.setInputFilePrefix(inputPrefix);
transformer.setMaxOutputBufferSize(maxBufferSize);
transformer.setOutputFilePrefix(outputPrefix);
return transformer;
}
});
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
@SuppressWarnings("unchecked")
Map<String, Object> obj = (Map<String, Object>)JSON.parse(response.getContentAsString());
Assert.assertNotNull(obj);
Assert.assertEquals(2, obj.size());
Assert.assertEquals(value0, obj.get(key0));
Assert.assertEquals(value1, obj.get(key2));
// Make sure the files do not exist.
try (DirectoryStream<Path> paths = Files.newDirectoryStream(targetTestsDir, inputPrefix + "*.*"))
{
Assert.assertFalse(paths.iterator().hasNext());
}
try (DirectoryStream<Path> paths = Files.newDirectoryStream(targetTestsDir, outputPrefix + "*.*"))
{
Assert.assertFalse(paths.iterator().hasNext());
}
}
@Test
public void testAfterContentTransformerClosingFilesOnClientRequestException() throws Exception
{
final Path targetTestsDir = prepareTargetTestsDir();
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
IO.copy(request.getInputStream(), IO.getNullStream());
}
});
final CountDownLatch destroyLatch = new CountDownLatch(1);
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
{
return new AfterContentTransformer()
{
{
setOverflowDirectory(targetTestsDir);
setMaxInputBufferSize(0);
setMaxOutputBufferSize(0);
}
@Override
public void transform(Source source, Sink sink) throws IOException
{
IO.copy(source.getInputStream(), sink.getOutputStream());
}
@Override
public void destroy()
{
super.destroy();
destroyLatch.countDown();
}
};
}
});
long idleTimeout = 1000;
proxyConnector.setIdleTimeout(idleTimeout);
startClient();
// Send only part of the content; the proxy will idle timeout.
final byte[] data = new byte[]{'c', 'a', 'f', 'e'};
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
.content(new BytesContentProvider(data)
{
@Override
public long getLength()
{
return data.length + 1;
}
})
.timeout(5 * idleTimeout, TimeUnit.MILLISECONDS)
.send();
Assert.assertTrue(destroyLatch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertEquals(HttpStatus.GATEWAY_TIMEOUT_504, response.getStatus());
}
@Test
public void testAfterContentTransformerClosingFilesOnServerResponseException() throws Exception
{
final Path targetTestsDir = prepareTargetTestsDir();
final CountDownLatch serviceLatch = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
response.setContentLength(2);
// Send only part of the content.
OutputStream output = response.getOutputStream();
output.write('x');
output.flush();
serviceLatch.countDown();
}
});
final CountDownLatch destroyLatch = new CountDownLatch(1);
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
return new AfterContentTransformer()
{
{
setOverflowDirectory(targetTestsDir);
setMaxInputBufferSize(0);
setMaxOutputBufferSize(0);
}
@Override
public void transform(Source source, Sink sink) throws IOException
{
IO.copy(source.getInputStream(), sink.getOutputStream());
}
@Override
public void destroy()
{
super.destroy();
destroyLatch.countDown();
}
};
}
});
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertTrue(serviceLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(destroyLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(HttpStatus.BAD_GATEWAY_502, response.getStatus());
}
private Path prepareTargetTestsDir() throws IOException
{
final Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
Files.createDirectories(targetTestsDir);
try (DirectoryStream<Path> files = Files.newDirectoryStream(targetTestsDir, "*.*"))
{
for (Path file : files)
{
if (!Files.isDirectory(file))
Files.delete(file);
}
}
return targetTestsDir;
}
private void sleep(long delay)
{
try
@ -1005,7 +1348,7 @@ public class AsyncMiddleManServletTest
}
}
}
/**
* A transformer that discards all but the first line of text.
*/