470727 - Thread Starvation with EWYK

Implemented NonBlockingCallbacks and fallback to PEC scheduling
This commit is contained in:
Greg Wilkins 2015-07-01 17:18:50 +10:00
parent a87823930a
commit 8d869bf88b
11 changed files with 284 additions and 6 deletions

View File

@ -34,8 +34,10 @@ import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.webapp.Configuration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@Ignore
public class JstlTest
{
private static Server server;

View File

@ -105,6 +105,12 @@ public abstract class FillInterest
return _interested.get() != null;
}
public boolean isCallbackNonBlocking()
{
Callback callback = _interested.get();
return callback instanceof Callback.NonBlocking;
}
/**
* Call to signal a failure to a registered interest
*

View File

@ -269,8 +269,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
if (attachment instanceof SelectableEndPoint)
{
// Try to produce a task
SelectableEndPoint selectable = (SelectableEndPoint)attachment;
Runnable task = selectable.onSelected();
Runnable task = ((SelectableEndPoint)attachment).onSelected();
if (task != null)
return task;
}

View File

@ -100,8 +100,23 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
// Call non blocking directly
if (readable && getFillInterest().isCallbackNonBlocking())
{
getFillInterest().fillable();
readable=false;
}
if (writable && getWriteFlusher().isCallbackNonBlocking())
{
getWriteFlusher().completeWrite();
writable=false;
}
// return task to complete the job
return readable ? (writable ? _runFillableCompleteWrite : _runFillable)
: (writable ? _runCompleteWrite : null);
}

View File

@ -268,6 +268,17 @@ abstract public class WriteFlusher
if (_callback!=null)
_callback.succeeded();
}
boolean isCallbackNonBlocking()
{
return _callback instanceof Callback.NonBlocking;
}
}
public boolean isCallbackNonBlocking()
{
State s = _state.get();
return (s instanceof PendingState) && ((PendingState)s).isCallbackNonBlocking();
}
/**

View File

@ -539,7 +539,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
private class BlockingReadCallback implements Callback
private class BlockingReadCallback implements Callback.NonBlocking
{
@Override
public void succeeded()

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.toolchain.test.PropertyFlag;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Before;
@ -45,6 +46,7 @@ public class HttpServerTestFixture
protected static final long PAUSE=10L;
protected static final int LOOPS= PropertyFlag.isEnabled("test.stress")?250:50;
protected QueuedThreadPool _threadPool;
protected Server _server;
protected URI _serverURI;
protected ServerConnector _connector;
@ -62,15 +64,21 @@ public class HttpServerTestFixture
@Before
public void before()
{
_server = new Server();
_threadPool = new QueuedThreadPool();
_server = new Server(_threadPool);
}
protected void startServer(ServerConnector connector) throws Exception
{
startServer(connector,new HandlerWrapper());
}
protected void startServer(ServerConnector connector, Handler handler) throws Exception
{
_connector = connector;
_connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setSendDateHeader(false);
_server.addConnector(_connector);
_server.setHandler(new HandlerWrapper());
_server.setHandler(handler);
_server.start();
_serverURI = _server.getURI();
}

View File

@ -0,0 +1,215 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class ThreadStarvationTest extends HttpServerTestFixture
{
ServerConnector _connector;
@Rule
public TestTracker tracker = new TestTracker();
@Before
public void init() throws Exception
{
_threadPool.setMinThreads(4);
_threadPool.setMaxThreads(4);
_threadPool.setDetailedDump(false);
_connector = new ServerConnector(_server,1,1);
_connector.setIdleTimeout(10000);
}
@Test
public void testReadInput() throws Exception
{
startServer(_connector,new ReadHandler());
System.err.println(_threadPool.dump());
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
os.write((
"GET / HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-length: 10\r\n" +
"\r\n" +
"0123456789\r\n").getBytes("utf-8"));
os.flush();
String response = IO.toString(is);
assertEquals(-1, is.read());
assertThat(response,containsString("200 OK"));
assertThat(response,containsString("Read Input 10"));
}
@Test
public void testEWYKStarvation() throws Exception
{
System.setProperty("org.eclipse.jetty.io.ManagedSelector$SelectorProducer.ExecutionStrategy","org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume");
startServer(_connector,new ReadHandler());
Socket[] client = new Socket[3];
OutputStream[] os = new OutputStream[client.length];
InputStream[] is = new InputStream[client.length];
for (int i=0;i<client.length;i++)
{
client[i]=newSocket(_serverURI.getHost(),_serverURI.getPort());
client[i].setSoTimeout(10000);
os[i]=client[i].getOutputStream();
is[i]=client[i].getInputStream();
os[i].write((
"PUT / HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-length: 10\r\n" +
"\r\n1").getBytes("utf-8"));
os[i].flush();
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i=0;i<client.length;i++)
{
os[i].write(("234567890\r\n").getBytes("utf-8"));
os[i].flush();
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i=0;i<client.length;i++)
{
String response = IO.toString(is[i]);
assertEquals(-1, is[i].read());
assertThat(response,containsString("200 OK"));
assertThat(response,containsString("Read Input 10"));
}
}
@Test
public void testPECStarvation() throws Exception
{
System.setProperty("org.eclipse.jetty.io.ManagedSelector$SelectorProducer.ExecutionStrategy","org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume");
startServer(_connector,new ReadHandler());
System.err.println(_threadPool.dump());
Socket[] client = new Socket[3];
OutputStream[] os = new OutputStream[client.length];
InputStream[] is = new InputStream[client.length];
for (int i=0;i<client.length;i++)
{
client[i]=newSocket(_serverURI.getHost(),_serverURI.getPort());
client[i].setSoTimeout(10000);
os[i]=client[i].getOutputStream();
is[i]=client[i].getInputStream();
os[i].write((
"PUT / HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-length: 10\r\n" +
"\r\n1").getBytes("utf-8"));
os[i].flush();
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i=0;i<client.length;i++)
{
os[i].write(("234567890\r\n").getBytes("utf-8"));
os[i].flush();
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i=0;i<client.length;i++)
{
String response = IO.toString(is[i]);
assertEquals(-1, is[i].read());
assertThat(response,containsString("200 OK"));
assertThat(response,containsString("Read Input 10"));
}
}
protected static class ReadHandler extends AbstractHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(200);
int l = request.getContentLength();
int r = 0;
while (r<l)
{
if (request.getInputStream().read()>=0)
r++;
}
response.getOutputStream().write(("Read Input "+r+"\r\n").getBytes());
}
}
}

View File

@ -55,6 +55,13 @@ public interface Callback
*/
public void failed(Throwable x);
/**
* A marker interface for a callback that is guaranteed not to
* block and thus does not need a dispatch
*/
public interface NonBlocking extends Callback
{}
/**
* <p>Empty implementation of {@link Callback}</p>
*/

View File

@ -125,7 +125,7 @@ public class SharedBlockingCallback
* A Closeable Callback.
* Uses the auto close mechanism to check block has been called OK.
*/
public class Blocker implements Callback, Closeable
public class Blocker implements Callback.NonBlocking, Closeable
{
private Throwable _state = IDLE;

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ThreadPool;
/**
* <p>A strategy where the thread calls produce will always run the resulting task
@ -52,11 +53,15 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
private boolean _execute;
private boolean _producing;
private boolean _pending;
private final ThreadPool _threadpool;
private final ProduceExecuteConsume _lowresources;
public ExecuteProduceConsume(Producer producer, Executor executor)
{
this._producer = producer;
this._executor = executor;
_threadpool = (executor instanceof ThreadPool)?((ThreadPool)executor):null;
_lowresources = _threadpool==null?null:new ProduceExecuteConsume(producer,executor);
}
@Override
@ -64,6 +69,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
if (LOG.isDebugEnabled())
LOG.debug("{} execute",this);
boolean produce=false;
try (Lock locked = _locker.lock())
{
@ -123,7 +129,16 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
}
if (produce)
{
// If we are low on resources, then switch to PEC strategy which does not
// suffer as badly from thread starvation
while (_threadpool!=null && _threadpool.isLowOnThreads())
{
LOG.debug("EWYK low resources {}",this);
_lowresources.execute();
}
produceAndRun();
}
}
private void produceAndRun()