Updates for better ThreadStarvationTests

This commit is contained in:
Joakim Erdfelt 2016-11-22 17:37:40 -07:00
parent bb428fb402
commit efc227e0e4
1 changed files with 137 additions and 151 deletions

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
@ -31,9 +32,12 @@ import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -64,8 +68,8 @@ public class ThreadStarvationTest
{ {
final static int BUFFER_SIZE=1024*1024; final static int BUFFER_SIZE=1024*1024;
final static int BUFFERS=64; final static int BUFFERS=64;
final static int CLIENTS=10;
final static int THREADS=5; final static int THREADS=5;
final static int CLIENTS=THREADS*2;
@Rule @Rule
public TestTracker tracker = new TestTracker(); public TestTracker tracker = new TestTracker();
@ -202,17 +206,18 @@ public class ThreadStarvationTest
prepareServer(new ReadHandler()); prepareServer(new ReadHandler());
_server.start(); _server.start();
Socket[] client = new Socket[CLIENTS]; ExecutorService clientExecutors = Executors.newFixedThreadPool(CLIENTS);
OutputStream[] os = new OutputStream[client.length];
InputStream[] is = new InputStream[client.length];
for (int i = 0; i < client.length; i++) List<Callable<String>> clientTasks = new ArrayList<>();
for(int i=0; i<CLIENTS; i++) {
clientTasks.add(() ->
{ {
client[i] = clientSocketProvider.newSocket("localhost", _connector.getLocalPort()); try (Socket client = clientSocketProvider.newSocket("localhost", _connector.getLocalPort());
client[i].setSoTimeout(10000); OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream())
os[i] = client[i].getOutputStream(); {
is[i] = client[i].getInputStream(); client.setSoTimeout(10000);
String request = "" + String request = "" +
"PUT / HTTP/1.0\r\n" + "PUT / HTTP/1.0\r\n" +
@ -220,29 +225,62 @@ public class ThreadStarvationTest
"content-length: 10\r\n" + "content-length: 10\r\n" +
"\r\n" + "\r\n" +
"1"; "1";
os[i].write(request.getBytes(StandardCharsets.UTF_8));
os[i].flush(); // Write partial request
out.write(request.getBytes(StandardCharsets.UTF_8));
out.flush();
// Finish Request
Thread.sleep(1500);
out.write(("234567890\r\n").getBytes(StandardCharsets.UTF_8));
out.flush();
// Read Response
String response = IO.toString(in);
assertEquals(-1, in.read());
return response;
}
});
} }
Thread.sleep(500); // new Thread(()->{
_threadPool.dump(System.out, ""); // try
// {
// TimeUnit.SECONDS.sleep(10);
//
// ServerConnector conn = _server.getBean(ServerConnector.class);
// ManagedSelector ms = conn.getSelectorManager().getBean(ManagedSelector.class);
//
// Selector sel = ms.getSelector();
// sel.keys().stream().map((key)->key.attachment()).forEach(
// (attach) -> {
// System.out.println(attach);
// SocketChannelEndPoint endp = (SocketChannelEndPoint) attach;
// SslConnection sslconn = (SslConnection) endp.getConnection();
// sslconn.dumpBuffers();
// });
//
// _server.dump(System.out, "");
// }
// catch (Throwable ignore)
// {
// }
// }).start();
for (int i = 0; i < client.length; i++) try
{ {
os[i].write(("234567890\r\n").getBytes(StandardCharsets.UTF_8)); List<Future<String>> responses = clientExecutors.invokeAll(clientTasks, 60, TimeUnit.SECONDS);
os[i].flush();
}
Thread.sleep(500); for (Future<String> responseFut : responses)
_threadPool.dump(System.out, "");
for (int i = 0; i < client.length; i++)
{ {
String response = IO.toString(is[i]); String response = responseFut.get();
assertEquals(-1, is[i].read());
assertThat(response, containsString("200 OK")); assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10")); assertThat(response, containsString("Read Input 10"));
} }
} finally
{
clientExecutors.shutdownNow();
}
} }
protected static class ReadHandler extends AbstractHandler protected static class ReadHandler extends AbstractHandler
@ -280,120 +318,70 @@ public class ThreadStarvationTest
prepareServer(new WriteHandler()); prepareServer(new WriteHandler());
_server.start(); _server.start();
Socket[] client = new Socket[CLIENTS]; ExecutorService clientExecutors = Executors.newFixedThreadPool(CLIENTS);
OutputStream[] os = new OutputStream[client.length];
final InputStream[] is = new InputStream[client.length];
for (int i = 0; i < client.length; i++) List<Callable<Long>> clientTasks = new ArrayList<>();
for(int i=0; i<CLIENTS; i++) {
clientTasks.add(() ->
{ {
client[i] = clientSocketProvider.newSocket("localhost", _connector.getLocalPort()); try (Socket client = clientSocketProvider.newSocket("localhost", _connector.getLocalPort());
client[i].setSoTimeout(10000); OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream())
{
client.setSoTimeout(30000);
os[i] = client[i].getOutputStream(); String request = "" +
is[i] = client[i].getInputStream();
String request =
"GET / HTTP/1.0\r\n" + "GET / HTTP/1.0\r\n" +
"host: localhost\r\n" + "host: localhost\r\n" +
"\r\n"; "\r\n";
os[i].write(request.getBytes(StandardCharsets.UTF_8));
os[i].flush(); // Write GET request
out.write(request.getBytes(StandardCharsets.UTF_8));
out.flush();
TimeUnit.MILLISECONDS.sleep(1500);
// Read Response
long bodyCount = 0;
long len;
byte buf[] = new byte[1024];
while((len = in.read(buf,0,buf.length)) != -1)
{
for(int x=0; x<len; x++)
{
if(buf[x] == '!') bodyCount++;
}
}
return bodyCount;
}
});
} }
Thread.sleep(100);
final AtomicLong total=new AtomicLong();
final CountDownLatch latch=new CountDownLatch(client.length);
for (int i = client.length; i-->0;)
{
final int c=i;
new Thread()
{
@Override
public void run()
{
byte[] content=new byte[BUFFER_SIZE];
int content_length=0;
String header= "No HEADER!";
try try
{ {
// Read an initial content buffer List<Future<Long>> responses = clientExecutors.invokeAll(clientTasks, 60, TimeUnit.SECONDS);
int len=0;
while (len<BUFFER_SIZE) long expected = BUFFERS * BUFFER_SIZE;
for (Future<Long> responseFut : responses)
{ {
int l=is[c].read(content,len,content.length-len); Long bodyCount = responseFut.get();
if (l<0) assertThat(bodyCount.longValue(), is(expected));
throw new IllegalStateException();
len+=l;
content_length+=l;
} }
} finally
// Look for the end of the header
int state=0;
loop: for(int j=0;j<len;j++)
{ {
content_length--; clientExecutors.shutdownNow();
switch(content[j])
{
case '\r':
state++;
break;
case '\n':
switch(state)
{
case 1:
state=2;
break;
case 3:
header=new String(content,0,j,StandardCharsets.ISO_8859_1);
assertThat(header,containsString(" 200 OK"));
break loop;
}
break;
default:
state=0;
break;
} }
} }
// Read the rest of the body
while(len>0)
{
len=is[c].read(content);
if (len>0)
content_length+=len;
}
// System.err.printf("client %d cl=%d %n%s%n",c,content_length,header);
total.addAndGet(content_length);
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
latch.countDown();
}
}
}.start();
}
latch.await();
assertEquals(CLIENTS*BUFFERS*BUFFER_SIZE,total.get());
}
protected static class WriteHandler extends AbstractHandler protected static class WriteHandler extends AbstractHandler
{ {
byte[] content=new byte[BUFFER_SIZE]; byte[] content=new byte[BUFFER_SIZE];
{ {
Arrays.fill(content,(byte)'x'); // Using a character that will not show up in a HTTP response header
Arrays.fill(content,(byte)'!');
} }
@Override @Override
@ -410,6 +398,4 @@ public class ThreadStarvationTest
} }
} }
} }
} }