Merged branch 'jetty-9.2.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-05-05 16:08:56 +02:00
commit 170b1ad8d0
2 changed files with 141 additions and 37 deletions

View File

@ -59,7 +59,7 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{ {
private static final Logger LOG = Log.getLogger(AfterContentTransformer.class); private static final Logger LOG = Log.getLogger(AfterContentTransformer.class);
private final List<ByteBuffer> buffers = new ArrayList<>(); private final List<ByteBuffer> sourceBuffers = new ArrayList<>();
private Path overflowDirectory = Paths.get(System.getProperty("java.io.tmpdir")); private Path overflowDirectory = Paths.get(System.getProperty("java.io.tmpdir"));
private String inputFilePrefix = "amms_adct_in_"; private String inputFilePrefix = "amms_adct_in_";
private String outputFilePrefix = "amms_adct_out_"; private String outputFilePrefix = "amms_adct_out_";
@ -188,7 +188,7 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{ {
ByteBuffer copy = ByteBuffer.allocate(input.remaining()); ByteBuffer copy = ByteBuffer.allocate(input.remaining());
copy.put(input).flip(); copy.put(input).flip();
buffers.add(copy); sourceBuffers.add(copy);
} }
} }
@ -196,8 +196,10 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{ {
Source source = new Source(); Source source = new Source();
Sink sink = new Sink(); Sink sink = new Sink();
transform(source, sink); if (transform(source, sink))
sink.drainTo(output); sink.drainTo(output);
else
source.drainTo(output);
} }
} }
@ -211,20 +213,28 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
* method is invoked only when the whole content is available, and offers * method is invoked only when the whole content is available, and offers
* a blocking API via the InputStream and OutputStream that can be obtained * a blocking API via the InputStream and OutputStream that can be obtained
* from {@link Source} and {@link Sink} respectively.</p> * from {@link Source} and {@link Sink} respectively.</p>
* <p>Implementations may read the source, inspect the input bytes and decide
* that no transformation is necessary, and therefore the source must be copied
* unchanged to the sink. In such case, the implementation must return false to
* indicate that it wishes to just pipe the bytes from the source to the sink.</p>
* <p>Typical implementations:</p> * <p>Typical implementations:</p>
* <pre> * <pre>
* // Identity transformation (no transformation, the input is copied to the output) * // Identity transformation (no transformation, the input is copied to the output)
* public void transform(Source source, Sink sink) * public boolean transform(Source source, Sink sink)
* { * {
* org.eclipse.jetty.util.IO.copy(source.getInputStream(), sink.getOutputStream()); * org.eclipse.jetty.util.IO.copy(source.getInputStream(), sink.getOutputStream());
* return true;
* } * }
* </pre> * </pre>
* *
* @param source where the original content is read * @param source where the original content is read
* @param sink where the transformed content is written * @param sink where the transformed content is written
* @return true if the transformation happened and the transformed bytes have
* been written to the sink, false if no transformation happened and the source
* must be copied to the sink.
* @throws IOException if the transformation fails * @throws IOException if the transformation fails
*/ */
public abstract void transform(Source source, Sink sink) throws IOException; public abstract boolean transform(Source source, Sink sink) throws IOException;
private void overflow(ByteBuffer input) throws IOException private void overflow(ByteBuffer input) throws IOException
{ {
@ -236,11 +246,11 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
StandardOpenOption.READ, StandardOpenOption.READ,
StandardOpenOption.WRITE, StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE); StandardOpenOption.DELETE_ON_CLOSE);
int size = buffers.size(); int size = sourceBuffers.size();
if (size > 0) if (size > 0)
{ {
inputFile.write(buffers.toArray(new ByteBuffer[size])); inputFile.write(sourceBuffers.toArray(new ByteBuffer[size]));
buffers.clear(); sourceBuffers.clear();
} }
} }
inputFile.write(input); inputFile.write(input);
@ -253,6 +263,22 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
close(outputFile); close(outputFile);
} }
private void drain(FileChannel file, List<ByteBuffer> output) throws IOException
{
long position = 0;
long length = file.size();
file.position(position);
while (length > 0)
{
// At most 1 GiB file maps.
long size = Math.min(1024 * 1024 * 1024, length);
ByteBuffer buffer = file.map(FileChannel.MapMode.READ_ONLY, position, size);
output.add(buffer);
position += size;
length -= size;
}
}
private void close(Closeable closeable) private void close(Closeable closeable)
{ {
try try
@ -297,6 +323,19 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{ {
return stream; return stream;
} }
private void drainTo(List<ByteBuffer> output) throws IOException
{
if (inputFile == null)
{
output.addAll(sourceBuffers);
sourceBuffers.clear();
}
else
{
drain(inputFile, output);
}
}
} }
private class ChannelInputStream extends InputStream private class ChannelInputStream extends InputStream
@ -333,11 +372,11 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{ {
if (len == 0) if (len == 0)
return 0; return 0;
if (index == buffers.size()) if (index == sourceBuffers.size())
return -1; return -1;
if (slice == null) if (slice == null)
slice = buffers.get(index).slice(); slice = sourceBuffers.get(index).slice();
int size = Math.min(len, slice.remaining()); int size = Math.min(len, slice.remaining());
slice.get(b, off, size); slice.get(b, off, size);
@ -371,7 +410,7 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
*/ */
public class Sink public class Sink
{ {
private final List<ByteBuffer> buffers = new ArrayList<>(); private final List<ByteBuffer> sinkBuffers = new ArrayList<>();
private final OutputStream stream = new SinkOutputStream(); private final OutputStream stream = new SinkOutputStream();
/** /**
@ -392,11 +431,11 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
StandardOpenOption.READ, StandardOpenOption.READ,
StandardOpenOption.WRITE, StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE); StandardOpenOption.DELETE_ON_CLOSE);
int size = buffers.size(); int size = sinkBuffers.size();
if (size > 0) if (size > 0)
{ {
outputFile.write(buffers.toArray(new ByteBuffer[size])); outputFile.write(sinkBuffers.toArray(new ByteBuffer[size]));
buffers.clear(); sinkBuffers.clear();
} }
} }
outputFile.write(output); outputFile.write(output);
@ -406,24 +445,13 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{ {
if (outputFile == null) if (outputFile == null)
{ {
output.addAll(buffers); output.addAll(sinkBuffers);
buffers.clear(); sinkBuffers.clear();
} }
else else
{ {
outputFile.force(true); outputFile.force(true);
long position = 0; drain(outputFile, output);
long length = outputFile.size();
outputFile.position(position);
while (length > 0)
{
// At most 1 GiB file maps.
long size = Math.min(1024 * 1024 * 1024, length);
ByteBuffer buffer = outputFile.map(FileChannel.MapMode.READ_ONLY, position, size);
output.add(buffer);
position += size;
length -= size;
}
} }
} }
@ -447,7 +475,7 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
// application so we need to copy it. // application so we need to copy it.
byte[] copy = new byte[len]; byte[] copy = new byte[len];
System.arraycopy(b, off, copy, 0, len); System.arraycopy(b, off, copy, 0, len);
buffers.add(ByteBuffer.wrap(copy)); sinkBuffers.add(ByteBuffer.wrap(copy));
} }
} }

View File

@ -76,6 +76,7 @@ import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.ajax.JSON; import org.eclipse.jetty.util.ajax.JSON;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -108,12 +109,12 @@ public class AsyncMiddleManServletTest
server.start(); server.start();
} }
private void startProxy(HttpServlet proxyServlet) throws Exception private void startProxy(AsyncMiddleManServlet proxyServlet) throws Exception
{ {
startProxy(proxyServlet, new HashMap<String, String>()); startProxy(proxyServlet, new HashMap<String, String>());
} }
private void startProxy(HttpServlet proxyServlet, Map<String, String> initParams) throws Exception private void startProxy(AsyncMiddleManServlet proxyServlet, Map<String, String> initParams) throws Exception
{ {
QueuedThreadPool proxyPool = new QueuedThreadPool(); QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy"); proxyPool.setName("proxy");
@ -134,6 +135,8 @@ public class AsyncMiddleManServletTest
proxyContext.addServlet(proxyServletHolder, "/*"); proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start(); proxy.start();
((StdErrLog)proxyServlet._log).setHideStacks(true);
} }
private void startClient() throws Exception private void startClient() throws Exception
@ -893,7 +896,7 @@ public class AsyncMiddleManServletTest
return new AfterContentTransformer() return new AfterContentTransformer()
{ {
@Override @Override
public void transform(Source source, Sink sink) throws IOException public boolean transform(Source source, Sink sink) throws IOException
{ {
InputStream input = source.getInputStream(); InputStream input = source.getInputStream();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -903,6 +906,7 @@ public class AsyncMiddleManServletTest
try (OutputStream output = sink.getOutputStream()) try (OutputStream output = sink.getOutputStream())
{ {
output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8)); output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
return true;
} }
} }
}; };
@ -963,7 +967,7 @@ public class AsyncMiddleManServletTest
} }
@Override @Override
public void transform(Source source, Sink sink) throws IOException public boolean transform(Source source, Sink sink) throws IOException
{ {
// Consume the stream once. // Consume the stream once.
InputStream input = source.getInputStream(); InputStream input = source.getInputStream();
@ -972,6 +976,7 @@ public class AsyncMiddleManServletTest
// Reset the stream and re-read it. // Reset the stream and re-read it.
input.reset(); input.reset();
IO.copy(input, sink.getOutputStream()); IO.copy(input, sink.getOutputStream());
return true;
} }
}; };
} }
@ -1016,7 +1021,7 @@ public class AsyncMiddleManServletTest
AfterContentTransformer transformer = new AfterContentTransformer() AfterContentTransformer transformer = new AfterContentTransformer()
{ {
@Override @Override
public void transform(Source source, Sink sink) throws IOException public boolean transform(Source source, Sink sink) throws IOException
{ {
InputStream input = source.getInputStream(); InputStream input = source.getInputStream();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -1026,6 +1031,7 @@ public class AsyncMiddleManServletTest
try (OutputStream output = sink.getOutputStream()) try (OutputStream output = sink.getOutputStream())
{ {
output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8)); output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
return true;
} }
} }
}; };
@ -1090,9 +1096,10 @@ public class AsyncMiddleManServletTest
} }
@Override @Override
public void transform(Source source, Sink sink) throws IOException public boolean transform(Source source, Sink sink) throws IOException
{ {
IO.copy(source.getInputStream(), sink.getOutputStream()); IO.copy(source.getInputStream(), sink.getOutputStream());
return true;
} }
@Override @Override
@ -1162,9 +1169,10 @@ public class AsyncMiddleManServletTest
} }
@Override @Override
public void transform(Source source, Sink sink) throws IOException public boolean transform(Source source, Sink sink) throws IOException
{ {
IO.copy(source.getInputStream(), sink.getOutputStream()); IO.copy(source.getInputStream(), sink.getOutputStream());
return true;
} }
@Override @Override
@ -1187,6 +1195,74 @@ public class AsyncMiddleManServletTest
Assert.assertEquals(HttpStatus.BAD_GATEWAY_502, response.getStatus()); Assert.assertEquals(HttpStatus.BAD_GATEWAY_502, response.getStatus());
} }
@Test
public void testAfterContentTransformerDoNotReadSourceDoNotTransform() throws Exception
{
testAfterContentTransformerDoNoTransform(false, false);
}
@Test
public void testAfterContentTransformerReadSourceDoNotTransform() throws Exception
{
testAfterContentTransformerDoNoTransform(true, true);
}
private void testAfterContentTransformerDoNoTransform(final boolean readSource, final boolean useDisk) throws Exception
{
final String key0 = "id";
long value0 = 1;
final String key1 = "channel";
String value1 = "foo";
final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }";
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
}
});
startProxy(new AsyncMiddleManServlet()
{
@Override
protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
{
return new AfterContentTransformer()
{
{
if (useDisk)
setMaxInputBufferSize(0);
}
@Override
public boolean transform(Source source, Sink sink) throws IOException
{
if (readSource)
{
InputStream input = source.getInputStream();
JSON.parse(new InputStreamReader(input, "UTF-8"));
}
// No transformation.
return false;
}
};
}
});
startClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
@SuppressWarnings("unchecked")
Map<String, Object> obj = (Map<String, Object>)JSON.parse(response.getContentAsString());
Assert.assertNotNull(obj);
Assert.assertEquals(2, obj.size());
Assert.assertEquals(value0, obj.get(key0));
Assert.assertEquals(value1, obj.get(key1));
}
private Path prepareTargetTestsDir() throws IOException private Path prepareTargetTestsDir() throws IOException
{ {
final Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath(); final Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();