From 400bdd1e9adb6f52f5b1a020d86e7ebba0cdd81c Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 16 Nov 2011 08:36:42 +1100 Subject: [PATCH] 349110 fixed bypass chunk handling --- .../jetty/client/SluggishServerTest.java | 250 ++++++++++++++++++ .../org/eclipse/jetty/http/HttpGenerator.java | 23 +- .../eclipse/jetty/io/bio/StreamEndPoint.java | 6 +- .../eclipse/jetty/io/nio/ChannelEndPoint.java | 2 +- 4 files changed, 267 insertions(+), 14 deletions(-) create mode 100644 jetty-client/src/test/java/org/eclipse/jetty/client/SluggishServerTest.java diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/SluggishServerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/SluggishServerTest.java new file mode 100644 index 00000000000..1df4b292cdf --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/SluggishServerTest.java @@ -0,0 +1,250 @@ +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; + +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import junit.framework.Assert; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.http.HttpHeaderValues; +import org.eclipse.jetty.http.HttpHeaders; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.bio.SocketConnector; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.Test; + +/** + * A class that attempts to simulate a client communicating with a slow server. It imposes a delay between each handle() call made to the server's handler. + * + * The client sends a binary blob of data to the server, and this blob is then inspected to verify correct transfer. + */ +public class SluggishServerTest +{ + + /** msec to wait between reads in the handler -- may need to adjust based on OS/HW/etc. to reproduce bug */ + private final static int READ_DELAY = 5; + + private final static String URL = "http://localhost:"; + + /** Stream providing a binary message to send */ + private static class SluggishStream extends InputStream + { + private final byte[] request; + private int pos; + + public SluggishStream(byte[] request) + { + this.request = request; + this.pos = 0; + } + + @Override + public int read() throws IOException + { + if (pos < request.length) + { + int byteVal = request[pos++] & 0xFF; + return byteVal; + } + else + { + return -1; + } + } + + } + + /** Sends a message containing random binary content to a SluggishHandler */ + private static class SluggishExchange extends HttpExchange + { + private byte[] request; + + public SluggishExchange(int port, int count) + { + request = new byte[count]; + for (int i=0;i accumulatedRequest; + + public SluggishHandler(int requestSize) + { + accumulatedRequest = new ArrayList(requestSize); + } + + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + accumulatedRequest.clear(); + ServletInputStream input = request.getInputStream(); + byte[] buffer = new byte[16384]; + int bytesAvailable; + while ((bytesAvailable = input.read(buffer,0,buffer.length)) > 0) + { + //System.err.println("AVAILABLE FOR READ = " + bytesAvailable); + for (int n = 0; n < bytesAvailable; ++n) + { + accumulatedRequest.add(buffer[n]); + } + try + { + Thread.sleep(READ_DELAY); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + response.setStatus(HttpServletResponse.SC_OK); + baseRequest.setHandled(true); + //System.err.println("HANDLED"); + } + + public byte[] getAccumulatedRequest() + { + byte[] buffer = new byte[accumulatedRequest.size()]; + int pos = 0; + for (Byte b : accumulatedRequest) + { + buffer[pos++] = b; + } + return buffer; + } + } + + private static boolean compareBuffers(byte[] sent, byte[] received) + { + if (sent.length != received.length) + { + System.err.format("Mismatch in sent/received lengths: sent=%d received=%d\n",sent.length,received.length); + return false; + } + else + { + for (int n = 0; n < sent.length; ++n) + { + if (sent[n] != received[n]) + { + System.err.format("Mismatch at offset %d: request=%d response=%d\n",n,sent[n],received[n]); + return false; + } + } + } + return true; + } + + @Test + public void test0() throws Exception + { + goSlow(20000,10); + } + + @Test + public void test1() throws Exception + { + goSlow(200000,5); + } + + @Test + public void test2() throws Exception + { + goSlow(2000000,2); + } + + void goSlow(int requestSize,int iterations) throws Exception + { + Server server = new Server(); + SocketConnector connector = new SocketConnector(); + server.addConnector(connector); + SluggishHandler handler = new SluggishHandler(requestSize); + server.setHandler(handler); + server.start(); + int port = connector.getLocalPort(); + + HttpClient client = new HttpClient(); + client.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + client.setConnectTimeout(5000); + client.setIdleTimeout(60000); + client.start(); + + try + { + for (int i = 0; i < iterations; ++i) + { + //System.err.format("-------------- ITERATION %d ------------------\n",i); + SluggishExchange exchange = new SluggishExchange(port,requestSize); + long startTime = System.currentTimeMillis(); + client.send(exchange); + exchange.waitForDone(); + long endTime = System.currentTimeMillis(); + //System.err.println("EXCHANGE STATUS = " + exchange); + //System.err.println("ELAPSED MSEC = " + (endTime - startTime)); + Assert.assertTrue(compareBuffers(exchange.getRequestBody(),handler.getAccumulatedRequest())); + } + } + finally + { + server.stop(); + server.join(); + } + } +} diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java index e9a7b63dfdc..a238ecf40db 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java @@ -849,10 +849,7 @@ public class HttpGenerator extends AbstractGenerator int total= 0; int len = -1; - int to_flush = - ((_header != null && _header.length() > 0)?4:0) | - ((_buffer != null && _buffer.length() > 0)?2:0) | - ((_bypass && _content != null && _content.length() > 0)?1:0); + int to_flush = flushMask(); int last_flush; do @@ -923,15 +920,13 @@ public class HttpGenerator extends AbstractGenerator // Try to prepare more to write. prepareBuffers(); } + } if (len > 0) total+=len; - to_flush = - ((_header != null && _header.length() > 0)?4:0) | - ((_buffer != null && _buffer.length() > 0)?2:0) | - ((_bypass && _content != null && _content.length() > 0)?1:0); + to_flush = flushMask(); } // loop while progress is being made (OR we have prepared some buffers that might make progress) while (len>0 || (to_flush!=0 && last_flush==0)); @@ -944,7 +939,15 @@ public class HttpGenerator extends AbstractGenerator throw (e instanceof EofException) ? e:new EofException(e); } } - + + /* ------------------------------------------------------------ */ + private int flushMask() + { + return ((_header != null && _header.length() > 0)?4:0) + | ((_buffer != null && _buffer.length() > 0)?2:0) + | ((_bypass && _content != null && _content.length() > 0)?1:0); + } + /* ------------------------------------------------------------ */ private void prepareBuffers() { @@ -963,7 +966,7 @@ public class HttpGenerator extends AbstractGenerator // Chunk buffer if need be if (_contentLength == HttpTokens.CHUNKED_CONTENT) { - if ((_buffer==null||_buffer.length()==0) && _content!=null) + if (_bypass && (_buffer==null||_buffer.length()==0) && _content!=null) { // this is a bypass write int size = _content.length(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StreamEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StreamEndPoint.java index 71afbad21ba..0ebab0058d3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StreamEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StreamEndPoint.java @@ -133,10 +133,10 @@ public class StreamEndPoint implements EndPoint try { - int read=buffer.readFrom(_in, space); - if (read<0) + int filled=buffer.readFrom(_in, space); + if (filled<0) shutdownInput(); - return read; + return filled; } catch(SocketTimeoutException e) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java index 70802b69452..a8146bd0aeb 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java @@ -341,7 +341,7 @@ public class ChannelEndPoint implements EndPoint trailer!=null && trailer.length()>0) length+=flush(trailer); } - + return length; }