jetty-9 \o/ first suspend resume cycles on jetty-9

This commit is contained in:
Greg Wilkins 2012-05-25 11:33:04 +02:00
parent c274cb6413
commit a0dae20ac0
6 changed files with 125 additions and 18 deletions

View File

@ -222,6 +222,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void close() public void close()
{ {
_closed=true; _closed=true;
onClose();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -413,6 +413,8 @@ public abstract class HttpChannel
completed(); completed();
} }
} }
LOG.debug("{} !process",this);
} }
} }

View File

@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AsyncByteArrayEndPoint; import org.eclipse.jetty.io.AsyncByteArrayEndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -45,12 +46,25 @@ public class LocalHttpConnector extends HttpConnector
return this; return this;
} }
/* ------------------------------------------------------------ */
/** Sends requests and get's responses based on thread activity.
* Returns all the responses received once the thread activity has
* returned to the level it was before the requests.
* @param requests
* @return
* @throws Exception
*/
public String getResponses(String requests) throws Exception public String getResponses(String requests) throws Exception
{ {
ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StringUtil.__UTF8_CHARSET)); ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StringUtil.__UTF8_CHARSET));
return result==null?null:BufferUtil.toString(result,StringUtil.__UTF8_CHARSET); return result==null?null:BufferUtil.toString(result,StringUtil.__UTF8_CHARSET);
} }
/* ------------------------------------------------------------ */
/** Sends requests and get's responses based on thread activity.
* Returns all the responses received once the thread activity has
* returned to the level it was before the requests.
*/
public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
{ {
LOG.debug("getResponses"); LOG.debug("getResponses");
@ -63,6 +77,13 @@ public class LocalHttpConnector extends HttpConnector
return request.takeOutput(); return request.takeOutput();
} }
/* ------------------------------------------------------------ */
/**
* Execute a request and return the EndPoint through which
* responses can be received.
* @param rawRequest
* @return
*/
public LocalEndPoint executeRequest(String rawRequest) public LocalEndPoint executeRequest(String rawRequest)
{ {
Phaser phaser=_executor._phaser; Phaser phaser=_executor._phaser;
@ -148,6 +169,8 @@ public class LocalHttpConnector extends HttpConnector
public class LocalEndPoint extends AsyncByteArrayEndPoint public class LocalEndPoint extends AsyncByteArrayEndPoint
{ {
private CountDownLatch _closed = new CountDownLatch(1);
LocalEndPoint() LocalEndPoint()
{ {
setGrowOutput(true); setGrowOutput(true);
@ -166,6 +189,41 @@ public class LocalHttpConnector extends HttpConnector
Thread.yield(); Thread.yield();
setInput(BufferUtil.toBuffer(s,StringUtil.__UTF8_CHARSET)); setInput(BufferUtil.toBuffer(s,StringUtil.__UTF8_CHARSET));
} }
@Override
public void onClose()
{
super.onClose();
_closed.countDown();
}
@Override
public void shutdownOutput()
{
super.shutdownOutput();
close();
}
public void waitUntilClosed()
{
while (isOpen())
{
try
{
if (!_closed.await(10,TimeUnit.SECONDS))
{
System.err.println("wait timeout:\n--");
System.err.println(takeOutputString());
System.err.println("==");
break;
}
}
catch(Exception e)
{
LOG.warn(e);
}
}
}
} }
} }

View File

@ -23,7 +23,9 @@ import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener; import javax.servlet.AsyncListener;
import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.server.session.SessionHandler;
import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -44,6 +46,9 @@ public class LocalAsyncContextTest
_server.setHandler(session); _server.setHandler(session);
_server.start(); _server.start();
__completed.set(0);
__completed1.set(0);
} }
protected Connector initConnector() protected Connector initConnector()
@ -59,11 +64,9 @@ public class LocalAsyncContextTest
} }
@Test @Test
public void testSuspendResume() throws Exception public void testSuspendTimeout() throws Exception
{ {
String response; String response;
__completed.set(0);
__completed1.set(0);
_handler.setRead(0); _handler.setRead(0);
_handler.setSuspendFor(1000); _handler.setSuspendFor(1000);
_handler.setResumeAfter(-1); _handler.setResumeAfter(-1);
@ -72,19 +75,38 @@ public class LocalAsyncContextTest
check(response,"TIMEOUT"); check(response,"TIMEOUT");
assertEquals(1,__completed.get()); assertEquals(1,__completed.get());
assertEquals(1,__completed1.get()); assertEquals(1,__completed1.get());
}
@Test
public void testSuspendResume0() throws Exception
{
String response;
_handler.setRead(0);
_handler.setSuspendFor(10000); _handler.setSuspendFor(10000);
_handler.setResumeAfter(0); _handler.setResumeAfter(0);
_handler.setCompleteAfter(-1); _handler.setCompleteAfter(-1);
response=process(null); response=process(null);
check(response,"DISPATCHED"); check(response,"STARTASYNC","DISPATCHED");
}
@Test
public void testSuspendResume100() throws Exception
{
String response;
_handler.setRead(0);
_handler.setSuspendFor(10000);
_handler.setResumeAfter(100); _handler.setResumeAfter(100);
_handler.setCompleteAfter(-1); _handler.setCompleteAfter(-1);
response=process(null); response=process(null);
check(response,"DISPATCHED"); check(response,"STARTASYNC","DISPATCHED");
}
@Test
public void testSuspendOther() throws Exception
{
String response;
_handler.setRead(0);
_handler.setSuspendFor(10000);
_handler.setResumeAfter(-1); _handler.setResumeAfter(-1);
_handler.setCompleteAfter(0); _handler.setCompleteAfter(0);
response=process(null); response=process(null);
@ -162,15 +184,14 @@ public class LocalAsyncContextTest
protected void check(String response,String... content) protected void check(String response,String... content)
{ {
assertEquals("HTTP/1.1 200 OK",response.substring(0,15)); Assert.assertThat(response,Matchers.startsWith("HTTP/1.1 200 OK"));
int i=0; int i=0;
for (String m:content) for (String m:content)
{ {
Assert.assertThat(response,Matchers.containsString(m));
i=response.indexOf(m,i); i=response.indexOf(m,i);
assertTrue(i>=0);
i+=m.length(); i+=m.length();
} }
} }
private synchronized String process(String content) throws Exception private synchronized String process(String content) throws Exception
@ -184,12 +205,18 @@ public class LocalAsyncContextTest
else else
request+="Content-Length: "+content.length()+"\r\n" +"\r\n" + content; request+="Content-Length: "+content.length()+"\r\n" +"\r\n" + content;
return getResponse(request); System.err.println("REQUEST: "+request);
String response=getResponse(request);
System.err.println("RESPONSE: "+response);
return response;
} }
protected String getResponse(String request) throws Exception protected String getResponse(String request) throws Exception
{ {
return ((LocalHttpConnector)_connector).getResponses(request); LocalHttpConnector connector=(LocalHttpConnector)_connector;
LocalHttpConnector.LocalEndPoint endp = connector.executeRequest(request);
endp.waitUntilClosed();
return endp.takeOutputString();
} }
@ -203,18 +230,21 @@ public class LocalAsyncContextTest
@Override @Override
public void onComplete(AsyncEvent event) throws IOException public void onComplete(AsyncEvent event) throws IOException
{ {
System.err.println("onComplete");
__completed.incrementAndGet(); __completed.incrementAndGet();
} }
@Override @Override
public void onError(AsyncEvent event) throws IOException public void onError(AsyncEvent event) throws IOException
{ {
System.err.println("onError");
__completed.incrementAndGet(); __completed.incrementAndGet();
} }
@Override @Override
public void onStartAsync(AsyncEvent event) throws IOException public void onStartAsync(AsyncEvent event) throws IOException
{ {
System.err.println("onStartAsync");
event.getSuppliedResponse().getOutputStream().println("startasync"); event.getSuppliedResponse().getOutputStream().println("startasync");
event.getAsyncContext().addListener(this); event.getAsyncContext().addListener(this);
} }
@ -222,6 +252,7 @@ public class LocalAsyncContextTest
@Override @Override
public void onTimeout(AsyncEvent event) throws IOException public void onTimeout(AsyncEvent event) throws IOException
{ {
System.err.println("onTimeout - dispatch!");
event.getSuppliedRequest().setAttribute("TIMEOUT",Boolean.TRUE); event.getSuppliedRequest().setAttribute("TIMEOUT",Boolean.TRUE);
event.getAsyncContext().dispatch(); event.getAsyncContext().dispatch();
} }

View File

@ -89,8 +89,7 @@ public class StressTest
_server = new Server(); _server = new Server();
_server.setThreadPool(_threads); _server.setThreadPool(_threads);
_connector = new SelectChannelConnector(); _connector = new SelectChannelConnector(1,1);
_connector.setAcceptors(1);
_connector.setAcceptQueueSize(5000); _connector.setAcceptQueueSize(5000);
_connector.setMaxIdleTime(30000); _connector.setMaxIdleTime(30000);
_server.addConnector(_connector); _server.addConnector(_connector);
@ -123,6 +122,8 @@ public class StressTest
assumeTrue(!OS.IS_OSX || Stress.isEnabled()); assumeTrue(!OS.IS_OSX || Stress.isEnabled());
doThreads(10,10,false); doThreads(10,10,false);
Thread.sleep(1000);
doThreads(100,20,false);
if (Stress.isEnabled()) if (Stress.isEnabled())
{ {
Thread.sleep(1000); Thread.sleep(1000);
@ -139,6 +140,8 @@ public class StressTest
assumeTrue(!OS.IS_OSX || Stress.isEnabled()); assumeTrue(!OS.IS_OSX || Stress.isEnabled());
doThreads(20,10,true); doThreads(20,10,true);
Thread.sleep(1000);
doThreads(100,50,true);
if (Stress.isEnabled()) if (Stress.isEnabled())
{ {
Thread.sleep(1000); Thread.sleep(1000);
@ -281,7 +284,7 @@ public class StressTest
System.out.println(" stage:\tbind\twrite\trecv\tdispatch\twrote\ttotal"); System.out.println(" stage:\tbind\twrite\trecv\tdispatch\twrote\ttotal");
for (int q=0;q<quantums;q++) for (int q=0;q<quantums;q++)
{ {
System.out.print(q+"00<=latency<"+(q+1)+"00"); System.out.printf("%02d00<=l<%02d00",q,(q+1));
for (int i=0;i<_latencies.length;i++) for (int i=0;i<_latencies.length;i++)
System.out.print("\t"+count[i][q]); System.out.print("\t"+count[i][q]);
System.out.println(); System.out.println();

View File

@ -149,6 +149,7 @@ class SuspendHandler extends HandlerWrapper
final AsyncContext asyncContext = baseRequest.startAsync(); final AsyncContext asyncContext = baseRequest.startAsync();
System.err.println("STARTASYNC");
response.getOutputStream().println("STARTASYNC"); response.getOutputStream().println("STARTASYNC");
asyncContext.addListener(LocalAsyncContextTest.__asyncListener); asyncContext.addListener(LocalAsyncContextTest.__asyncListener);
asyncContext.addListener(LocalAsyncContextTest.__asyncListener1); asyncContext.addListener(LocalAsyncContextTest.__asyncListener1);
@ -165,6 +166,7 @@ class SuspendHandler extends HandlerWrapper
try try
{ {
Thread.sleep(_completeAfter); Thread.sleep(_completeAfter);
System.err.println("COMPLETED");
response.getOutputStream().println("COMPLETED"); response.getOutputStream().println("COMPLETED");
response.setStatus(200); response.setStatus(200);
baseRequest.setHandled(true); baseRequest.setHandled(true);
@ -179,6 +181,7 @@ class SuspendHandler extends HandlerWrapper
} }
else if (_completeAfter==0) else if (_completeAfter==0)
{ {
System.err.println("COMPLETED0");
response.getOutputStream().println("COMPLETED"); response.getOutputStream().println("COMPLETED");
response.setStatus(200); response.setStatus(200);
baseRequest.setHandled(true); baseRequest.setHandled(true);
@ -212,13 +215,20 @@ class SuspendHandler extends HandlerWrapper
else else
{ {
if (request.getAttribute("TIMEOUT")!=null) if (request.getAttribute("TIMEOUT")!=null)
{
System.err.println("TIMEOUT");
response.getOutputStream().println("TIMEOUT"); response.getOutputStream().println("TIMEOUT");
}
else else
{
System.err.println("DISPATCHED");
response.getOutputStream().println("DISPATCHED"); response.getOutputStream().println("DISPATCHED");
}
if (_suspendFor2>=0) if (_suspendFor2>=0)
{ {
final AsyncContext asyncContext = baseRequest.startAsync(); final AsyncContext asyncContext = baseRequest.startAsync();
System.err.println("STARTASYNC2");
response.getOutputStream().println("STARTASYNC2"); response.getOutputStream().println("STARTASYNC2");
if (_suspendFor2>0) if (_suspendFor2>0)
asyncContext.setTimeout(_suspendFor2); asyncContext.setTimeout(_suspendFor2);
@ -233,6 +243,7 @@ class SuspendHandler extends HandlerWrapper
try try
{ {
Thread.sleep(_completeAfter2); Thread.sleep(_completeAfter2);
System.err.println("COMPLETED2");
response.getOutputStream().println("COMPLETED2"); response.getOutputStream().println("COMPLETED2");
response.setStatus(200); response.setStatus(200);
baseRequest.setHandled(true); baseRequest.setHandled(true);
@ -247,6 +258,7 @@ class SuspendHandler extends HandlerWrapper
} }
else if (_completeAfter2==0) else if (_completeAfter2==0)
{ {
System.err.println("COMPLETED2==0");
response.getOutputStream().println("COMPLETED2"); response.getOutputStream().println("COMPLETED2");
response.setStatus(200); response.setStatus(200);
baseRequest.setHandled(true); baseRequest.setHandled(true);