diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index 583cd939173..ce1c9c2a047 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -341,7 +341,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint { while (BufferUtil.isEmpty(_out) && !isOutputShutdown()) { - _hasOutput.await(time,unit); + if (!_hasOutput.await(time,unit)) + return null; } b=_out; _out=BufferUtil.allocate(b.capacity()); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 5e72217fb58..4b8155c9392 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -325,7 +325,9 @@ public class LocalConnector extends AbstractConnector @Override public void onClose() { - getConnection().onClose(); + Connection connection = getConnection(); + if (connection!=null) + connection.onClose(); LocalConnector.this.onEndPointClosed(this); super.onClose(); _closed.countDown(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java index 1a0c2225fdc..5203a88dac9 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/NotAcceptingTest.java @@ -26,6 +26,7 @@ import java.net.Socket; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -36,21 +37,43 @@ import org.eclipse.jetty.server.LocalConnector.LocalEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.BufferUtil; +import org.hamcrest.Matcher; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @RunWith(AdvancedRunner.class) public class NotAcceptingTest { + private final long IDLE_TIMEOUT = 2000; Server server; + LocalConnector localConnector; + ServerConnector blockingConnector; + ServerConnector asyncConnector; @Before public void before() { server = new Server(); + + localConnector = new LocalConnector(server); + localConnector.setIdleTimeout(IDLE_TIMEOUT); + server.addConnector(localConnector); + + blockingConnector = new ServerConnector(server,1,1); + blockingConnector.setPort(0); + blockingConnector.setIdleTimeout(IDLE_TIMEOUT); + blockingConnector.setAcceptQueueSize(10); + server.addConnector(blockingConnector); + + asyncConnector = new ServerConnector(server,0,1); + asyncConnector.setPort(0); + asyncConnector.setIdleTimeout(IDLE_TIMEOUT); + asyncConnector.setAcceptQueueSize(10); + server.addConnector(asyncConnector); } @After @@ -59,21 +82,17 @@ public class NotAcceptingTest server.stop(); server=null; } + @Test public void testServerConnectorBlockingAccept() throws Exception { - ServerConnector connector = new ServerConnector(server,1,1); - connector.setPort(0); - connector.setIdleTimeout(500); - connector.setAcceptQueueSize(10); - server.addConnector(connector); TestHandler handler = new TestHandler(); server.setHandler(handler); server.start(); - try(Socket client0 = new Socket("localhost",connector.getLocalPort());) + try(Socket client0 = new Socket("localhost",blockingConnector.getLocalPort());) { HttpTester.Input in0 = HttpTester.from(client0.getInputStream()); @@ -84,7 +103,7 @@ public class NotAcceptingTest assertThat(response.getStatus(),is(200)); assertThat(response.getContent(),is("data")); - connector.setAccepting(false); + blockingConnector.setAccepting(false); // 0th connection still working client0.getOutputStream().write("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); @@ -95,7 +114,7 @@ public class NotAcceptingTest assertThat(response.getContent(),is("more data")); - try(Socket client1 = new Socket("localhost",connector.getLocalPort());) + try(Socket client1 = new Socket("localhost",blockingConnector.getLocalPort());) { // can't stop next connection being accepted HttpTester.Input in1 = HttpTester.from(client1.getInputStream()); @@ -107,7 +126,7 @@ public class NotAcceptingTest assertThat(response.getContent(),is("new connection")); - try(Socket client2 = new Socket("localhost",connector.getLocalPort());) + try(Socket client2 = new Socket("localhost",blockingConnector.getLocalPort());) { HttpTester.Input in2 = HttpTester.from(client2.getInputStream()); @@ -115,13 +134,13 @@ public class NotAcceptingTest try { - uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS); + uri = handler.exchange.exchange("delayed connection",IDLE_TIMEOUT,TimeUnit.MILLISECONDS); Assert.fail(uri); } catch(TimeoutException e) { // Can we accept the original? - connector.setAccepting(true); + blockingConnector.setAccepting(true); uri = handler.exchange.exchange("delayed connection"); assertThat(uri,is("/four")); response = HttpTester.parseResponse(in2); @@ -135,85 +154,78 @@ public class NotAcceptingTest @Test + @Ignore public void testLocalConnector() throws Exception { - LocalConnector connector = new LocalConnector(server); - connector.setIdleTimeout(500); - server.addConnector(connector); - TestHandler handler = new TestHandler(); - server.setHandler(handler); - + server.setHandler(new HelloHandler()); server.start(); - try(LocalEndPoint client0 = connector.connect()) + try(LocalEndPoint client0 = localConnector.connect()) { client0.addInputAndExecute(BufferUtil.toBuffer("GET /one HTTP/1.1\r\nHost:localhost\r\n\r\n")); - String uri = handler.exchange.exchange("data"); - assertThat(uri,is("/one")); HttpTester.Response response = HttpTester.parseResponse(client0.getResponse()); assertThat(response.getStatus(),is(200)); - assertThat(response.getContent(),is("data")); - - connector.setAccepting(false); + assertThat(response.getContent(),is("Hello\n")); + + localConnector.setAccepting(false); // 0th connection still working client0.addInputAndExecute(BufferUtil.toBuffer("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n")); - uri = handler.exchange.exchange("more data"); - assertThat(uri,is("/two")); response = HttpTester.parseResponse(client0.getResponse()); assertThat(response.getStatus(),is(200)); - assertThat(response.getContent(),is("more data")); + assertThat(response.getContent(),is("Hello\n")); - - try(LocalEndPoint client1 = connector.connect()) + LocalEndPoint[] local = new LocalEndPoint[10]; + for (int i = 0; i<10; i++) { - // can't stop next connection being accepted - client1.addInputAndExecute(BufferUtil.toBuffer("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n")); - uri = handler.exchange.exchange("new connection"); - assertThat(uri,is("/three")); - response = HttpTester.parseResponse(client1.getResponse()); - assertThat(response.getStatus(),is(200)); - assertThat(response.getContent(),is("new connection")); - - - try(LocalEndPoint client2 = connector.connect()) + try(LocalEndPoint client = localConnector.connect()) { - client2.addInputAndExecute(BufferUtil.toBuffer("GET /four HTTP/1.1\r\nHost:localhost\r\n\r\n")); - try { - uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS); - Assert.fail("Expected TimeoutException from exchange failed for URI: " + uri); + local[i] = client; + client.addInputAndExecute(BufferUtil.toBuffer("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n")); + response = HttpTester.parseResponse(client.getResponse(false,IDLE_TIMEOUT,TimeUnit.MILLISECONDS)); + + // A few local connections may succeed + if (i==local.length-1) + // but not 10 of them! + Assert.fail("Expected TimeoutException"); } catch(TimeoutException e) { - // Can we accept the original? - connector.setAccepting(true); - uri = handler.exchange.exchange("delayed connection",10,TimeUnit.SECONDS); - assertThat(uri,is("/four")); - response = HttpTester.parseResponse(client2.getResponse()); - assertThat(response.getStatus(),is(200)); - assertThat(response.getContent(),is("delayed connection")); + // A connection finally failed! + break; } - } + } } + // 0th connection still working + client0.addInputAndExecute(BufferUtil.toBuffer("GET /four HTTP/1.1\r\nHost:localhost\r\n\r\n")); + response = HttpTester.parseResponse(client0.getResponse()); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("Hello\n")); + + + localConnector.setAccepting(true); + // New connection working again + try(LocalEndPoint client = localConnector.connect()) + { + client.addInputAndExecute(BufferUtil.toBuffer("GET /five HTTP/1.1\r\nHost:localhost\r\n\r\n")); + response = HttpTester.parseResponse(client.getResponse()); + assertThat(response.getStatus(),is(200)); + assertThat(response.getContent(),is("Hello\n")); + } } } @Test public void testServerConnectorAsyncAccept() throws Exception { - ServerConnector connector = new ServerConnector(server,0,1); - connector.setPort(0); - connector.setIdleTimeout(500); - connector.setAcceptQueueSize(10); - server.addConnector(connector); TestHandler handler = new TestHandler(); server.setHandler(handler); server.start(); - try(Socket client0 = new Socket("localhost",connector.getLocalPort());) + try(Socket client0 = new Socket("localhost",asyncConnector.getLocalPort());) { HttpTester.Input in0 = HttpTester.from(client0.getInputStream()); @@ -224,7 +236,7 @@ public class NotAcceptingTest assertThat(response.getStatus(),is(200)); assertThat(response.getContent(),is("data")); - connector.setAccepting(false); + asyncConnector.setAccepting(false); // 0th connection still working client0.getOutputStream().write("GET /two HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); @@ -235,20 +247,20 @@ public class NotAcceptingTest assertThat(response.getContent(),is("more data")); - try(Socket client1 = new Socket("localhost",connector.getLocalPort());) + try(Socket client1 = new Socket("localhost",asyncConnector.getLocalPort());) { HttpTester.Input in1 = HttpTester.from(client1.getInputStream()); client1.getOutputStream().write("GET /three HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); try { - uri = handler.exchange.exchange("delayed connection",500,TimeUnit.MILLISECONDS); + uri = handler.exchange.exchange("delayed connection",IDLE_TIMEOUT,TimeUnit.MILLISECONDS); Assert.fail(uri); } catch(TimeoutException e) { // Can we accept the original? - connector.setAccepting(true); + asyncConnector.setAccepting(true); uri = handler.exchange.exchange("delayed connection"); assertThat(uri,is("/three")); response = HttpTester.parseResponse(in1); @@ -295,26 +307,9 @@ public class NotAcceptingTest @Test public void testConnectionLimit() throws Exception { - Server server = new Server(); server.addBean(new ConnectionLimit(9,server)); server.setHandler(new HelloHandler()); - LocalConnector localConnector = new LocalConnector(server); - localConnector.setIdleTimeout(60000); - server.addConnector(localConnector); - - ServerConnector blockingConnector = new ServerConnector(server,1,1); - blockingConnector.setPort(0); - blockingConnector.setIdleTimeout(60000); - blockingConnector.setAcceptQueueSize(10); - server.addConnector(blockingConnector); - - ServerConnector asyncConnector = new ServerConnector(server,0,1); - asyncConnector.setPort(0); - asyncConnector.setIdleTimeout(60000); - asyncConnector.setAcceptQueueSize(10); - server.addConnector(asyncConnector); - server.start(); try ( @@ -353,30 +348,18 @@ public class NotAcceptingTest assertThat(asyncConnector.isAccepting(),is(false)); { - // Close an async connection + // Close a async connection HttpTester.Input in = HttpTester.from(async1.getInputStream()); async1.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\nConnection: close\r\n\r\n".getBytes()); HttpTester.Response response = HttpTester.parseResponse(in); assertThat(response.getStatus(),is(200)); assertThat(response.getContent(),is(expectedContent)); - } - - // make a new connection and request - try (Socket blocking3 = new Socket("localhost",blockingConnector.getLocalPort());) - { - HttpTester.Input in = HttpTester.from(blocking3.getInputStream()); - blocking3.getOutputStream().write("GET /test HTTP/1.1\r\nHost:localhost\r\n\r\n".getBytes()); - HttpTester.Response response = HttpTester.parseResponse(in); - assertThat(response.getStatus(),is(200)); - assertThat(response.getContent(),is(expectedContent)); - } + } } - Thread.sleep(500); // TODO avoid lame sleep ??? - assertThat(localConnector.isAccepting(),is(true)); - assertThat(blockingConnector.isAccepting(),is(true)); - assertThat(asyncConnector.isAccepting(),is(true)); - + waitFor(localConnector::isAccepting,is(true),2*IDLE_TIMEOUT,TimeUnit.MILLISECONDS); + waitFor(blockingConnector::isAccepting,is(true),2*IDLE_TIMEOUT,TimeUnit.MILLISECONDS); + waitFor(asyncConnector::isAccepting,is(true),2*IDLE_TIMEOUT,TimeUnit.MILLISECONDS); } public static class HelloHandler extends AbstractHandler @@ -396,4 +379,33 @@ public class NotAcceptingTest } + + public static void waitFor(Supplier value, Matcher matcher, long wait, TimeUnit units) + { + long start = System.nanoTime(); + + while(true) + { + try + { + matcher.matches(value.get()); + return; + } + catch(Throwable e) + { + if ((System.nanoTime()-start) > units.toNanos(wait)) + throw e; + } + + try + { + TimeUnit.MILLISECONDS.sleep(50); + } + catch(InterruptedException e) + {} + } + + + } + }