diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index 91ee1a88845..0d35192054c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -222,6 +222,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint public void close() { _closed=true; + onClose(); } /* ------------------------------------------------------------ */ diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 4c6d44947e5..e1fe0cc1f09 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -373,7 +373,7 @@ public abstract class HttpChannel __currentChannel.set(null); if (threadName!=null) Thread.currentThread().setName(threadName); - + if (_state.isUncompleted()) { try @@ -413,6 +413,8 @@ public abstract class HttpChannel completed(); } } + + LOG.debug("{} !process",this); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java index ef5c31eebe4..2262e618486 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java @@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.AsyncByteArrayEndPoint; import org.eclipse.jetty.util.BufferUtil; @@ -45,12 +46,25 @@ public class LocalHttpConnector extends HttpConnector return this; } + /* ------------------------------------------------------------ */ + /** Sends requests and get's responses based on thread activity. + * Returns all the responses received once the thread activity has + * returned to the level it was before the requests. + * @param requests + * @return + * @throws Exception + */ public String getResponses(String requests) throws Exception { ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StringUtil.__UTF8_CHARSET)); return result==null?null:BufferUtil.toString(result,StringUtil.__UTF8_CHARSET); } + /* ------------------------------------------------------------ */ + /** Sends requests and get's responses based on thread activity. + * Returns all the responses received once the thread activity has + * returned to the level it was before the requests. + */ public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception { LOG.debug("getResponses"); @@ -63,6 +77,13 @@ public class LocalHttpConnector extends HttpConnector return request.takeOutput(); } + /* ------------------------------------------------------------ */ + /** + * Execute a request and return the EndPoint through which + * responses can be received. + * @param rawRequest + * @return + */ public LocalEndPoint executeRequest(String rawRequest) { Phaser phaser=_executor._phaser; @@ -148,6 +169,8 @@ public class LocalHttpConnector extends HttpConnector public class LocalEndPoint extends AsyncByteArrayEndPoint { + private CountDownLatch _closed = new CountDownLatch(1); + LocalEndPoint() { setGrowOutput(true); @@ -166,6 +189,41 @@ public class LocalHttpConnector extends HttpConnector Thread.yield(); setInput(BufferUtil.toBuffer(s,StringUtil.__UTF8_CHARSET)); } + + @Override + public void onClose() + { + super.onClose(); + _closed.countDown(); + } + + @Override + public void shutdownOutput() + { + super.shutdownOutput(); + close(); + } + + public void waitUntilClosed() + { + while (isOpen()) + { + try + { + if (!_closed.await(10,TimeUnit.SECONDS)) + { + System.err.println("wait timeout:\n--"); + System.err.println(takeOutputString()); + System.err.println("=="); + break; + } + } + catch(Exception e) + { + LOG.warn(e); + } + } + } } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/LocalAsyncContextTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/LocalAsyncContextTest.java index d6dc00eae2b..254226665e8 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/LocalAsyncContextTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/LocalAsyncContextTest.java @@ -23,7 +23,9 @@ import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import org.eclipse.jetty.server.session.SessionHandler; +import org.hamcrest.Matchers; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -44,6 +46,9 @@ public class LocalAsyncContextTest _server.setHandler(session); _server.start(); + + __completed.set(0); + __completed1.set(0); } protected Connector initConnector() @@ -59,11 +64,9 @@ public class LocalAsyncContextTest } @Test - public void testSuspendResume() throws Exception + public void testSuspendTimeout() throws Exception { String response; - __completed.set(0); - __completed1.set(0); _handler.setRead(0); _handler.setSuspendFor(1000); _handler.setResumeAfter(-1); @@ -72,19 +75,38 @@ public class LocalAsyncContextTest check(response,"TIMEOUT"); assertEquals(1,__completed.get()); assertEquals(1,__completed1.get()); + } + @Test + public void testSuspendResume0() throws Exception + { + String response; + _handler.setRead(0); _handler.setSuspendFor(10000); - _handler.setResumeAfter(0); _handler.setCompleteAfter(-1); response=process(null); - check(response,"DISPATCHED"); + check(response,"STARTASYNC","DISPATCHED"); + } + @Test + public void testSuspendResume100() throws Exception + { + String response; + _handler.setRead(0); + _handler.setSuspendFor(10000); _handler.setResumeAfter(100); _handler.setCompleteAfter(-1); response=process(null); - check(response,"DISPATCHED"); + check(response,"STARTASYNC","DISPATCHED"); + } + @Test + public void testSuspendOther() throws Exception + { + String response; + _handler.setRead(0); + _handler.setSuspendFor(10000); _handler.setResumeAfter(-1); _handler.setCompleteAfter(0); response=process(null); @@ -162,15 +184,14 @@ public class LocalAsyncContextTest protected void check(String response,String... content) { - assertEquals("HTTP/1.1 200 OK",response.substring(0,15)); + Assert.assertThat(response,Matchers.startsWith("HTTP/1.1 200 OK")); int i=0; for (String m:content) { + Assert.assertThat(response,Matchers.containsString(m)); i=response.indexOf(m,i); - assertTrue(i>=0); i+=m.length(); } - } private synchronized String process(String content) throws Exception @@ -184,12 +205,18 @@ public class LocalAsyncContextTest else request+="Content-Length: "+content.length()+"\r\n" +"\r\n" + content; - return getResponse(request); + System.err.println("REQUEST: "+request); + String response=getResponse(request); + System.err.println("RESPONSE: "+response); + return response; } protected String getResponse(String request) throws Exception { - return ((LocalHttpConnector)_connector).getResponses(request); + LocalHttpConnector connector=(LocalHttpConnector)_connector; + LocalHttpConnector.LocalEndPoint endp = connector.executeRequest(request); + endp.waitUntilClosed(); + return endp.takeOutputString(); } @@ -203,18 +230,21 @@ public class LocalAsyncContextTest @Override public void onComplete(AsyncEvent event) throws IOException { + System.err.println("onComplete"); __completed.incrementAndGet(); } @Override public void onError(AsyncEvent event) throws IOException { + System.err.println("onError"); __completed.incrementAndGet(); } @Override public void onStartAsync(AsyncEvent event) throws IOException { + System.err.println("onStartAsync"); event.getSuppliedResponse().getOutputStream().println("startasync"); event.getAsyncContext().addListener(this); } @@ -222,6 +252,7 @@ public class LocalAsyncContextTest @Override public void onTimeout(AsyncEvent event) throws IOException { + System.err.println("onTimeout - dispatch!"); event.getSuppliedRequest().setAttribute("TIMEOUT",Boolean.TRUE); event.getAsyncContext().dispatch(); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/StressTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/StressTest.java index ea49aaa9905..4bd4b99ccf6 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/StressTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/StressTest.java @@ -89,8 +89,7 @@ public class StressTest _server = new Server(); _server.setThreadPool(_threads); - _connector = new SelectChannelConnector(); - _connector.setAcceptors(1); + _connector = new SelectChannelConnector(1,1); _connector.setAcceptQueueSize(5000); _connector.setMaxIdleTime(30000); _server.addConnector(_connector); @@ -123,6 +122,8 @@ public class StressTest assumeTrue(!OS.IS_OSX || Stress.isEnabled()); doThreads(10,10,false); + Thread.sleep(1000); + doThreads(100,20,false); if (Stress.isEnabled()) { Thread.sleep(1000); @@ -139,6 +140,8 @@ public class StressTest assumeTrue(!OS.IS_OSX || Stress.isEnabled()); doThreads(20,10,true); + Thread.sleep(1000); + doThreads(100,50,true); if (Stress.isEnabled()) { Thread.sleep(1000); @@ -281,22 +284,22 @@ public class StressTest System.out.println(" stage:\tbind\twrite\trecv\tdispatch\twrote\ttotal"); for (int q=0;q=0) { final AsyncContext asyncContext = baseRequest.startAsync(); + System.err.println("STARTASYNC2"); response.getOutputStream().println("STARTASYNC2"); if (_suspendFor2>0) asyncContext.setTimeout(_suspendFor2); @@ -233,6 +243,7 @@ class SuspendHandler extends HandlerWrapper try { Thread.sleep(_completeAfter2); + System.err.println("COMPLETED2"); response.getOutputStream().println("COMPLETED2"); response.setStatus(200); baseRequest.setHandled(true); @@ -247,6 +258,7 @@ class SuspendHandler extends HandlerWrapper } else if (_completeAfter2==0) { + System.err.println("COMPLETED2==0"); response.getOutputStream().println("COMPLETED2"); response.setStatus(200); baseRequest.setHandled(true);