diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java index f53726cf547..10d5cccfdae 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.server; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -31,9 +32,12 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; @@ -64,8 +68,8 @@ public class ThreadStarvationTest { final static int BUFFER_SIZE=1024*1024; final static int BUFFERS=64; - final static int CLIENTS=10; final static int THREADS=5; + final static int CLIENTS=THREADS*2; @Rule public TestTracker tracker = new TestTracker(); @@ -201,47 +205,81 @@ public class ThreadStarvationTest { prepareServer(new ReadHandler()); _server.start(); + + ExecutorService clientExecutors = Executors.newFixedThreadPool(CLIENTS); + + List> clientTasks = new ArrayList<>(); + + for(int i=0; i + { + try (Socket client = clientSocketProvider.newSocket("localhost", _connector.getLocalPort()); + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream()) + { + client.setSoTimeout(10000); - Socket[] client = new Socket[CLIENTS]; - OutputStream[] os = new OutputStream[client.length]; - InputStream[] is = new InputStream[client.length]; - - for (int i = 0; i < client.length; i++) - { - client[i] = clientSocketProvider.newSocket("localhost", _connector.getLocalPort()); - client[i].setSoTimeout(10000); - - os[i] = client[i].getOutputStream(); - is[i] = client[i].getInputStream(); - - String request = "" + - "PUT / HTTP/1.0\r\n" + - "host: localhost\r\n" + - "content-length: 10\r\n" + - "\r\n" + - "1"; - os[i].write(request.getBytes(StandardCharsets.UTF_8)); - os[i].flush(); + String request = "" + + "PUT / HTTP/1.0\r\n" + + "host: localhost\r\n" + + "content-length: 10\r\n" + + "\r\n" + + "1"; + + // Write partial request + out.write(request.getBytes(StandardCharsets.UTF_8)); + out.flush(); + + // Finish Request + Thread.sleep(1500); + out.write(("234567890\r\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + // Read Response + String response = IO.toString(in); + assertEquals(-1, in.read()); + return response; + } + }); } - - Thread.sleep(500); - _threadPool.dump(System.out, ""); - - for (int i = 0; i < client.length; i++) + +// new Thread(()->{ +// try +// { +// TimeUnit.SECONDS.sleep(10); +// +// ServerConnector conn = _server.getBean(ServerConnector.class); +// ManagedSelector ms = conn.getSelectorManager().getBean(ManagedSelector.class); +// +// Selector sel = ms.getSelector(); +// sel.keys().stream().map((key)->key.attachment()).forEach( +// (attach) -> { +// System.out.println(attach); +// SocketChannelEndPoint endp = (SocketChannelEndPoint) attach; +// SslConnection sslconn = (SslConnection) endp.getConnection(); +// sslconn.dumpBuffers(); +// }); +// +// _server.dump(System.out, ""); +// } +// catch (Throwable ignore) +// { +// } +// }).start(); + + try { - os[i].write(("234567890\r\n").getBytes(StandardCharsets.UTF_8)); - os[i].flush(); - } - - Thread.sleep(500); - _threadPool.dump(System.out, ""); - - for (int i = 0; i < client.length; i++) + List> responses = clientExecutors.invokeAll(clientTasks, 60, TimeUnit.SECONDS); + + for (Future responseFut : responses) + { + String response = responseFut.get(); + assertThat(response, containsString("200 OK")); + assertThat(response, containsString("Read Input 10")); + } + } finally { - String response = IO.toString(is[i]); - assertEquals(-1, is[i].read()); - assertThat(response, containsString("200 OK")); - assertThat(response, containsString("Read Input 10")); + clientExecutors.shutdownNow(); } } @@ -279,121 +317,71 @@ public class ThreadStarvationTest { prepareServer(new WriteHandler()); _server.start(); - - Socket[] client = new Socket[CLIENTS]; - OutputStream[] os = new OutputStream[client.length]; - final InputStream[] is = new InputStream[client.length]; - - for (int i = 0; i < client.length; i++) - { - client[i] = clientSocketProvider.newSocket("localhost", _connector.getLocalPort()); - client[i].setSoTimeout(10000); - - os[i] = client[i].getOutputStream(); - is[i] = client[i].getInputStream(); - - String request = - "GET / HTTP/1.0\r\n" + - "host: localhost\r\n" + - "\r\n"; - os[i].write(request.getBytes(StandardCharsets.UTF_8)); - os[i].flush(); - } - - Thread.sleep(100); - - final AtomicLong total=new AtomicLong(); - final CountDownLatch latch=new CountDownLatch(client.length); - - for (int i = client.length; i-->0;) - { - final int c=i; - new Thread() - { - @Override - public void run() - { - byte[] content=new byte[BUFFER_SIZE]; - int content_length=0; - String header= "No HEADER!"; - try - { - // Read an initial content buffer - int len=0; - - while (len0) - { - len=is[c].read(content); - if (len>0) - content_length+=len; - } - - // System.err.printf("client %d cl=%d %n%s%n",c,content_length,header); - total.addAndGet(content_length); - } - catch(Exception e) - { - e.printStackTrace(); - } - finally - { - latch.countDown(); - } - } - }.start(); - - } - - latch.await(); - assertEquals(CLIENTS*BUFFERS*BUFFER_SIZE,total.get()); - } + ExecutorService clientExecutors = Executors.newFixedThreadPool(CLIENTS); + + List> clientTasks = new ArrayList<>(); + + for(int i=0; i + { + try (Socket client = clientSocketProvider.newSocket("localhost", _connector.getLocalPort()); + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream()) + { + client.setSoTimeout(30000); + + String request = "" + + "GET / HTTP/1.0\r\n" + + "host: localhost\r\n" + + "\r\n"; + + // Write GET request + out.write(request.getBytes(StandardCharsets.UTF_8)); + out.flush(); + + TimeUnit.MILLISECONDS.sleep(1500); + + // Read Response + long bodyCount = 0; + long len; + + byte buf[] = new byte[1024]; + + while((len = in.read(buf,0,buf.length)) != -1) + { + for(int x=0; x> responses = clientExecutors.invokeAll(clientTasks, 60, TimeUnit.SECONDS); + + long expected = BUFFERS * BUFFER_SIZE; + for (Future responseFut : responses) + { + Long bodyCount = responseFut.get(); + assertThat(bodyCount.longValue(), is(expected)); + } + } finally + { + clientExecutors.shutdownNow(); + } + } protected static class WriteHandler extends AbstractHandler { byte[] content=new byte[BUFFER_SIZE]; { - Arrays.fill(content,(byte)'x'); + // Using a character that will not show up in a HTTP response header + Arrays.fill(content,(byte)'!'); } @Override @@ -410,6 +398,4 @@ public class ThreadStarvationTest } } } - - }