Merge branch 'master' into jetty-8

This commit is contained in:
Jesse McConnell 2012-02-24 06:48:50 -06:00
commit 8b20316efc
5 changed files with 249 additions and 46 deletions

View File

@ -38,6 +38,7 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
private int _total_no_progress;
private final AsyncEndPoint _asyncEndp;
private boolean _readInterested = true;
public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server)
{
@ -103,29 +104,44 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
{
some_progress|=progress;
// Is this request/response round complete and are fully flushed?
if (_parser.isComplete() && _generator.isComplete())
boolean parserComplete = _parser.isComplete();
boolean generatorComplete = _generator.isComplete();
boolean complete = parserComplete && generatorComplete;
if (parserComplete)
{
// Reset the parser/generator
progress=true;
// look for a switched connection instance?
if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
if (generatorComplete)
{
Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
if (switched!=null)
connection=switched;
// Reset the parser/generator
progress=true;
// look for a switched connection instance?
if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
if (switched!=null)
connection=switched;
}
reset();
// TODO Is this still required?
if (!_generator.isPersistent() && !_endp.isOutputShutdown())
{
LOG.warn("Safety net oshut!!! IF YOU SEE THIS, PLEASE RAISE BUGZILLA");
_endp.shutdownOutput();
}
}
reset();
// TODO Is this still required?
if (!_generator.isPersistent() && !_endp.isOutputShutdown())
else
{
LOG.warn("Safety net oshut!!! IF YOU SEE THIS, PLEASE RAISE BUGZILLA");
_endp.shutdownOutput();
// We have finished parsing, but not generating so
// we must not be interested in reading until we
// have finished generating and we reset the generator
_readInterested = false;
LOG.debug("Disabled read interest while writing response {}", _endp);
}
}
else if (_request.getAsyncContinuation().isAsyncStarted())
if (!complete && _request.getAsyncContinuation().isAsyncStarted())
{
// The request is suspended, so even though progress has been made,
// exit the while loop by setting progress to false
@ -183,4 +199,17 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
_parser.setPersistent(false);
}
@Override
public void reset()
{
_readInterested = true;
LOG.debug("Enabled read interest {}", _endp);
super.reset();
}
@Override
public boolean isSuspended()
{
return !_readInterested || super.isSuspended();
}
}

View File

@ -0,0 +1,159 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.lessThan;
public class SlowClientWithPipelinedRequestTest
{
private final AtomicInteger handles = new AtomicInteger();
private Server server;
private SelectChannelConnector connector;
public void startServer(Handler handler) throws Exception
{
server = new Server();
connector = new SelectChannelConnector()
{
@Override
protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint)
{
return new AsyncHttpConnection(this, endpoint, getServer())
{
@Override
public Connection handle() throws IOException
{
handles.incrementAndGet();
return super.handle();
}
};
}
};
server.addConnector(connector);
connector.setPort(0);
server.setHandler(handler);
server.start();
}
@After
public void stopServer() throws Exception
{
if (server != null)
{
server.stop();
server.join();
}
}
@Test
public void testSlowClientWithPipelinedRequest() throws Exception
{
final int contentLength = 512 * 1024;
startServer(new AbstractHandler()
{
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
{
baseRequest.setHandled(true);
System.err.println("target = " + target);
if ("/content".equals(target))
{
// We simulate what the DefaultServlet does, bypassing the blocking
// write mechanism otherwise the test does not reproduce the bug
OutputStream outputStream = response.getOutputStream();
AbstractHttpConnection.Output output = (AbstractHttpConnection.Output)outputStream;
// Since the test is via localhost, we need a really big buffer to stall the write
byte[] bytes = new byte[contentLength];
Arrays.fill(bytes, (byte)'9');
Buffer buffer = new ByteArrayBuffer(bytes);
// Do a non blocking write
output.sendContent(buffer);
}
}
});
Socket client = new Socket("localhost", connector.getLocalPort());
OutputStream output = client.getOutputStream();
output.write(("" +
"GET /content HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"\r\n" +
"").getBytes("UTF-8"));
output.flush();
InputStream input = client.getInputStream();
int read = input.read();
Assert.assertTrue(read >= 0);
// As soon as we can read the response, send a pipelined request
// so it is a different read for the server and it will trigger NIO
output.write(("" +
"GET /pipelined HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"\r\n" +
"").getBytes("UTF-8"));
output.flush();
// Simulate a slow reader
Thread.sleep(1000);
Assert.assertThat(handles.get(), lessThan(10));
// We are sure we are not spinning, read the content
StringBuilder lines = new StringBuilder().append((char)read);
int crlfs = 0;
while (true)
{
read = input.read();
lines.append((char)read);
if (read == '\r' || read == '\n')
++crlfs;
else
crlfs = 0;
if (crlfs == 4)
break;
}
Assert.assertTrue(lines.toString().contains(" 200 "));
// Read the body
for (int i = 0; i < contentLength; ++i)
input.read();
// Read the pipelined response
lines.setLength(0);
crlfs = 0;
while (true)
{
read = input.read();
lines.append((char)read);
if (read == '\r' || read == '\n')
++crlfs;
else
crlfs = 0;
if (crlfs == 4)
break;
}
Assert.assertTrue(lines.toString().contains(" 200 "));
client.close();
}
}

View File

@ -105,7 +105,7 @@ public class GzipWithPipeliningTest
client.connect();
// Request text that will be gzipped + chunked in the response
client.issueGET("/lots-of-fantasy-names.txt",true);
client.issueGET("/lots-of-fantasy-names.txt",true, false);
respHeader = client.readResponseHeader();
System.out.println("Response Header #1 --\n" + respHeader);
@ -125,7 +125,7 @@ public class GzipWithPipeliningTest
System.out.printf("Read %,d bytes%n",readBytes);
// Issue another request
client.issueGET("/jetty_logo.png",true);
client.issueGET("/jetty_logo.png",true, false);
// Finish reading chunks
System.out.println("Finish reading remaining chunks ...");

View File

@ -1,7 +1,5 @@
package org.eclipse.jetty.servlets;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -17,6 +15,8 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog;
import org.junit.Assert;
import static org.hamcrest.Matchers.not;
public class PipelineHelper
{
private static final Logger LOG = Log.getLogger(PipelineHelper.class);
@ -64,7 +64,7 @@ public class PipelineHelper
* to turn on acceptance of GZIP compressed responses
* @throws IOException
*/
public void issueGET(String path, boolean acceptGzipped) throws IOException
public void issueGET(String path, boolean acceptGzipped, boolean close) throws IOException
{
LOG.debug("Issuing GET on " + path);
StringBuilder req = new StringBuilder();
@ -79,7 +79,15 @@ public class PipelineHelper
req.append("Accept-Encoding: gzip, deflate\r\n");
}
req.append("Cookie: JSESSIONID=spqx8v8szylt1336t96vc6mw0\r\n");
req.append("Connection: keep-alive\r\n");
if ( close )
{
req.append("Connection: close\r\n");
}
else
{
req.append("Connection: keep-alive\r\n");
}
req.append("\r\n");
LOG.debug("Request:" + req);
@ -189,6 +197,15 @@ public class PipelineHelper
while (left > 0)
{
int val = inputStream.read();
try
{
if (left % 10 == 0)
Thread.sleep(1);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
if (val == (-1))
{
Assert.fail(String.format("Encountered an early EOL (expected another %,d bytes)",left));

View File

@ -15,9 +15,6 @@
*******************************************************************************/
package org.eclipse.jetty.websocket.helper;
import static org.hamcrest.Matchers.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -35,6 +32,8 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
public class SafariD00
{
private static final Logger LOG = Log.getLogger(SafariD00.class);
@ -103,23 +102,22 @@ public class SafariD00
out.write(buf,0,buf.length);
out.flush();
// Read HTTP 101 Upgrade / Handshake Response
InputStreamReader reader = new InputStreamReader(in);
BufferedReader br = new BufferedReader(reader);
socket.setSoTimeout(10000);
LOG.debug("Reading http header");
boolean foundEnd = false;
String line;
while (!foundEnd)
// Read HTTP 101 Upgrade / Handshake Response
InputStreamReader reader = new InputStreamReader(in);
LOG.debug("Reading http headers");
int crlfs = 0;
while (true)
{
line = br.readLine();
// System.out.printf("RESP: %s%n",line);
if (line.length() == 0)
{
foundEnd = true;
}
int read = in.read();
if (read == '\r' || read == '\n')
++crlfs;
else
crlfs = 0;
if (crlfs == 4)
break;
}
// Read expected handshake hixie bytes