Fixes #558 - HTTP/2 server hangs when thread pool is low on threads.

Fixed test that was broken after changes for #557.
This commit is contained in:
Simone Bordet 2016-05-16 15:40:35 +02:00
parent dee3331ffb
commit 55e5f74889
1 changed files with 23 additions and 49 deletions

View File

@ -37,7 +37,9 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
import org.junit.*; import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -165,34 +167,37 @@ public class ThreadStarvationTest
} }
@Test @Test
@Ignore
public void testEPCExitsLowThreadsMode() throws Exception public void testEPCExitsLowThreadsMode() throws Exception
{ {
prepareServer(new ReadHandler()); prepareServer(new ReadHandler());
Assert.assertEquals(2, _availableThreads); _threadPool.setMaxThreads(5);
_connector.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory()); _connector.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory());
_server.start(); _server.start();
System.err.println(_server.dump()); System.err.println(_server.dump());
// Two idle threads in the pool here. // Three idle threads in the pool here.
// The server will accept the socket in normal mode. // The server will accept the socket in normal mode.
Socket client = new Socket("localhost", _connector.getLocalPort()); Socket client = new Socket("localhost", _connector.getLocalPort());
client.setSoTimeout(10000); client.setSoTimeout(10000);
Thread.sleep(500); Thread.sleep(500);
// Now steal one thread. // Now steal two threads.
CountDownLatch latch = new CountDownLatch(1); CountDownLatch[] latches = new CountDownLatch[2];
_threadPool.execute(() -> for (int i = 0; i < latches.length; ++i)
{ {
try CountDownLatch latch = latches[i] = new CountDownLatch(1);
_threadPool.execute(() ->
{ {
latch.await(); try
} {
catch (InterruptedException ignored) latch.await();
{ }
} catch (InterruptedException ignored)
}); {
}
});
}
InputStream is = client.getInputStream(); InputStream is = client.getInputStream();
OutputStream os = client.getOutputStream(); OutputStream os = client.getOutputStream();
@ -218,56 +223,25 @@ public class ThreadStarvationTest
assertTrue(executionStrategy.isLowOnThreads()); assertTrue(executionStrategy.isLowOnThreads());
} }
// Release the stolen thread. // Release the stolen threads.
latch.countDown(); for (CountDownLatch latch : latches)
latch.countDown();
Thread.sleep(500); Thread.sleep(500);
// Send the rest of the body to unblock the reader thread. // Send the rest of the body to unblock the reader thread.
// This will be run directly by the selector thread, // This will be run directly by the selector thread,
// which therefore will remain in low threads mode. // which then will exit the low threads mode.
os.write("234567890".getBytes(StandardCharsets.UTF_8)); os.write("234567890".getBytes(StandardCharsets.UTF_8));
os.flush(); os.flush();
Thread.sleep(500); Thread.sleep(500);
System.err.println(_threadPool.dump()); System.err.println(_threadPool.dump());
// Back to two idle threads here, but we are still in
// low threads mode because the SelectorProducer has
// not returned from the low threads mode.
String response = IO.toString(is);
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
// Send another request.
// Accepting a new connection will exit the low threads mode.
client = new Socket("localhost", _connector.getLocalPort());
client.setSoTimeout(10000);
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (ManagedSelector selector : _connector.getSelectorManager().getBeans(ManagedSelector.class)) for (ManagedSelector selector : _connector.getSelectorManager().getBeans(ManagedSelector.class))
{ {
ExecuteProduceConsume executionStrategy = (ExecuteProduceConsume)selector.getExecutionStrategy(); ExecuteProduceConsume executionStrategy = (ExecuteProduceConsume)selector.getExecutionStrategy();
assertFalse(executionStrategy.isLowOnThreads()); assertFalse(executionStrategy.isLowOnThreads());
} }
is = client.getInputStream();
os = client.getOutputStream();
request = "" +
"PUT / HTTP/1.0\r\n" +
"Host: localhost\r\n" +
"Content-Length: 10\r\n" +
"\r\n" +
"1234567890";
os.write(request.getBytes(StandardCharsets.UTF_8));
os.flush();
response = IO.toString(is);
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
} }
protected static class ReadHandler extends AbstractHandler protected static class ReadHandler extends AbstractHandler