Fixes #547 (ExecuteProduceConsume (EWYK) does not exit low threads mode)

Exposed getters/setters for ExecutionStrategy.Factory in
ServerConnector and SelectorManager, to allow explicit configuration
and testing.

Added test to verify that EPC exits low threads mode.
This commit is contained in:
Simone Bordet 2016-05-06 16:34:01 +02:00
parent 8bb6d4d7dc
commit caa45283c7
6 changed files with 359 additions and 211 deletions

View File

@ -79,11 +79,17 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
setStopTimeout(5000);
}
public ExecutionStrategy getExecutionStrategy()
{
return _strategy;
}
@Override
protected void doStart() throws Exception
{
super.doStart();
_selector = newSelector();
_selectorManager.execute(this);
}
protected Selector newSelector() throws IOException

View File

@ -27,12 +27,11 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
/**
@ -41,7 +40,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
* <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
* {@link EndPoint}s and {@link Connection}s.</p>
*/
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
public abstract class SelectorManager extends ContainerLifeCycle implements Dumpable
{
public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
protected static final Logger LOG = Log.getLogger(SelectorManager.class);
@ -50,6 +49,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private final Scheduler scheduler;
private final ManagedSelector[] _selectors;
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private ExecutionStrategy.Factory _executionFactory = ExecutionStrategy.Factory.getDefault();
private long _selectorIndex;
protected SelectorManager(Executor executor, Scheduler scheduler)
@ -96,6 +96,24 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
_connectTimeout = milliseconds;
}
/**
* @return the {@link ExecutionStrategy.Factory} used by {@link ManagedSelector}
*/
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return _executionFactory;
}
/**
* @param _executionFactory the {@link ExecutionStrategy.Factory} used by {@link ManagedSelector}
*/
public void setExecutionStrategyFactory(ExecutionStrategy.Factory _executionFactory)
{
if (isRunning())
throw new IllegalStateException("Cannot change " + ExecutionStrategy.Factory.class.getSimpleName() + " after start()");
this._executionFactory = _executionFactory;
}
/**
* @return the selector priority delta
* @deprecated not implemented
@ -246,14 +264,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
@Override
protected void doStart() throws Exception
{
super.doStart();
for (int i = 0; i < _selectors.length; i++)
{
ManagedSelector selector = newSelector(i);
_selectors[i] = selector;
selector.start();
execute(selector);
addBean(selector);
}
super.doStart();
}
/**
@ -264,15 +281,15 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/
protected ManagedSelector newSelector(int id)
{
return new ManagedSelector(this, id);
return new ManagedSelector(this, id, getExecutionStrategyFactory());
}
@Override
protected void doStop() throws Exception
{
for (ManagedSelector selector : _selectors)
selector.stop();
super.doStop();
for (ManagedSelector selector : _selectors)
removeBean(selector);
}
/**
@ -376,17 +393,4 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
*/
public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
@Override
public String dump()
{
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
}
}

View File

@ -42,6 +42,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
/**
@ -475,6 +476,22 @@ public class ServerConnector extends AbstractNetworkConnector
_reuseAddress = reuseAddress;
}
/**
* @return the ExecutionStrategy factory to use for SelectorManager
*/
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return _manager.getExecutionStrategyFactory();
}
/**
* @param executionFactory the ExecutionStrategy factory to use for SelectorManager
*/
public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionFactory)
{
_manager.setExecutionStrategyFactory(executionFactory);
}
protected class ServerConnectorManager extends SelectorManager
{
public ServerConnectorManager(Executor executor, Scheduler scheduler, int selectors)

View File

@ -18,179 +18,259 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import javax.net.ssl.SSLException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class ThreadStarvationTest extends HttpServerTestFixture
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class ThreadStarvationTest
{
ServerConnector _connector;
@Rule
public TestTracker tracker = new TestTracker();
private QueuedThreadPool _threadPool;
private Server _server;
private ServerConnector _connector;
private int _availableThreads;
@Before
public void init() throws Exception
private Server prepareServer(Handler handler)
{
_threadPool.setMinThreads(4);
_threadPool.setMaxThreads(4);
_threadPool.setDetailedDump(false);
_connector = new ServerConnector(_server,1,1);
_connector.setIdleTimeout(10000);
int threads = 4;
_threadPool = new QueuedThreadPool();
_threadPool.setMinThreads(threads);
_threadPool.setMaxThreads(threads);
_threadPool.setDetailedDump(true);
_server = new Server(_threadPool);
int acceptors = 1;
int selectors = 1;
_connector = new ServerConnector(_server, acceptors, selectors);
_server.addConnector(_connector);
_server.setHandler(handler);
_availableThreads = threads - acceptors - selectors;
return _server;
}
@After
public void dispose() throws Exception
{
_server.stop();
}
@Test
public void testReadInput() throws Exception
{
startServer(_connector,new ReadHandler());
System.err.println(_threadPool.dump());
Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
client.setSoTimeout(10000);
prepareServer(new ReadHandler()).start();
OutputStream os=client.getOutputStream();
InputStream is=client.getInputStream();
Socket client = new Socket("localhost", _connector.getLocalPort());
os.write((
"GET / HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-length: 10\r\n" +
OutputStream os = client.getOutputStream();
InputStream is = client.getInputStream();
String request = "" +
"GET / HTTP/1.0\r\n" +
"Host: localhost\r\n" +
"Content-Length: 10\r\n" +
"\r\n" +
"0123456789\r\n").getBytes("utf-8"));
"0123456789\r\n";
os.write(request.getBytes(StandardCharsets.UTF_8));
os.flush();
String response = IO.toString(is);
assertEquals(-1, is.read());
assertThat(response,containsString("200 OK"));
assertThat(response,containsString("Read Input 10"));
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
}
@Test
public void testEWYKStarvation() throws Exception
public void testEPCStarvation() throws Exception
{
System.setProperty("org.eclipse.jetty.io.ManagedSelector$SelectorProducer.ExecutionStrategy","org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume");
startServer(_connector,new ReadHandler());
Socket[] client = new Socket[3];
OutputStream[] os = new OutputStream[client.length];
InputStream[] is = new InputStream[client.length];
for (int i=0;i<client.length;i++)
{
client[i]=newSocket(_serverURI.getHost(),_serverURI.getPort());
client[i].setSoTimeout(10000);
os[i]=client[i].getOutputStream();
is[i]=client[i].getInputStream();
os[i].write((
"PUT / HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-length: 10\r\n" +
"\r\n1").getBytes("utf-8"));
os[i].flush();
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i=0;i<client.length;i++)
{
os[i].write(("234567890\r\n").getBytes("utf-8"));
os[i].flush();
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i=0;i<client.length;i++)
{
String response = IO.toString(is[i]);
assertEquals(-1, is[i].read());
assertThat(response,containsString("200 OK"));
assertThat(response,containsString("Read Input 10"));
}
testStarvation(new ExecuteProduceConsume.Factory());
}
@Test
public void testPECStarvation() throws Exception
{
System.setProperty("org.eclipse.jetty.io.ManagedSelector$SelectorProducer.ExecutionStrategy","org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume");
testStarvation(new ProduceExecuteConsume.Factory());
}
startServer(_connector,new ReadHandler());
private void testStarvation(ExecutionStrategy.Factory executionFactory) throws Exception
{
prepareServer(new ReadHandler());
_connector.setExecutionStrategyFactory(executionFactory);
_server.start();
System.err.println(_threadPool.dump());
Socket[] client = new Socket[3];
Socket[] client = new Socket[_availableThreads + 1];
OutputStream[] os = new OutputStream[client.length];
InputStream[] is = new InputStream[client.length];
for (int i=0;i<client.length;i++)
for (int i = 0; i < client.length; i++)
{
client[i]=newSocket(_serverURI.getHost(),_serverURI.getPort());
client[i] = new Socket("localhost", _connector.getLocalPort());
client[i].setSoTimeout(10000);
os[i]=client[i].getOutputStream();
is[i]=client[i].getInputStream();
os[i] = client[i].getOutputStream();
is[i] = client[i].getInputStream();
os[i].write((
"PUT / HTTP/1.0\r\n"+
"host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
"content-length: 10\r\n" +
"\r\n1").getBytes("utf-8"));
String request = "" +
"PUT / HTTP/1.0\r\n" +
"host: localhost\r\n" +
"content-length: 10\r\n" +
"\r\n" +
"1";
os[i].write(request.getBytes(StandardCharsets.UTF_8));
os[i].flush();
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i=0;i<client.length;i++)
for (int i = 0; i < client.length; i++)
{
os[i].write(("234567890\r\n").getBytes("utf-8"));
os[i].write(("234567890\r\n").getBytes(StandardCharsets.UTF_8));
os[i].flush();
}
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (int i=0;i<client.length;i++)
for (int i = 0; i < client.length; i++)
{
String response = IO.toString(is[i]);
assertEquals(-1, is[i].read());
assertThat(response,containsString("200 OK"));
assertThat(response,containsString("Read Input 10"));
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
}
}
@Test
public void testEPCExitsLowThreadsMode() throws Exception
{
prepareServer(new ReadHandler());
Assert.assertEquals(2, _availableThreads);
_connector.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory());
_server.start();
System.err.println(_server.dump());
// Two idle threads in the pool here.
// The server will accept the socket in normal mode.
Socket client = new Socket("localhost", _connector.getLocalPort());
client.setSoTimeout(10000);
Thread.sleep(500);
// Now steal one thread.
CountDownLatch latch = new CountDownLatch(1);
_threadPool.execute(() ->
{
try
{
latch.await();
}
catch (InterruptedException ignored)
{
}
});
InputStream is = client.getInputStream();
OutputStream os = client.getOutputStream();
String request = "" +
"PUT / HTTP/1.0\r\n" +
"Host: localhost\r\n" +
"Content-Length: 10\r\n" +
"\r\n" +
"1";
os.write(request.getBytes(StandardCharsets.UTF_8));
os.flush();
Thread.sleep(500);
System.err.println(_threadPool.dump());
// Request did not send the whole body, Handler
// is blocked reading, zero idle threads here,
// EPC is in low threads mode.
for (ManagedSelector selector : _connector.getSelectorManager().getBeans(ManagedSelector.class))
{
ExecuteProduceConsume executionStrategy = (ExecuteProduceConsume)selector.getExecutionStrategy();
assertTrue(executionStrategy.isLowOnThreads());
}
// Release the stolen thread.
latch.countDown();
Thread.sleep(500);
// Send the rest of the body to unblock the reader thread.
// This will be run directly by the selector thread,
// which therefore will remain in low threads mode.
os.write("234567890".getBytes(StandardCharsets.UTF_8));
os.flush();
Thread.sleep(500);
System.err.println(_threadPool.dump());
// Back to two idle threads here, but we are still in
// low threads mode because the SelectorProducer has
// not returned from the low threads mode.
String response = IO.toString(is);
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
// Send another request.
// Accepting a new connection will exit the low threads mode.
client = new Socket("localhost", _connector.getLocalPort());
client.setSoTimeout(10000);
Thread.sleep(500);
System.err.println(_threadPool.dump());
for (ManagedSelector selector : _connector.getSelectorManager().getBeans(ManagedSelector.class))
{
ExecuteProduceConsume executionStrategy = (ExecuteProduceConsume)selector.getExecutionStrategy();
assertFalse(executionStrategy.isLowOnThreads());
}
is = client.getInputStream();
os = client.getOutputStream();
request = "" +
"PUT / HTTP/1.0\r\n" +
"Host: localhost\r\n" +
"Content-Length: 10\r\n" +
"\r\n" +
"1234567890";
os.write(request.getBytes(StandardCharsets.UTF_8));
os.flush();
response = IO.toString(is);
assertThat(response, containsString("200 OK"));
assertThat(response, containsString("Read Input 10"));
}
protected static class ReadHandler extends AbstractHandler
{
@ -202,13 +282,13 @@ public class ThreadStarvationTest extends HttpServerTestFixture
int l = request.getContentLength();
int r = 0;
while (r<l)
while (r < l)
{
if (request.getInputStream().read()>=0)
if (request.getInputStream().read() >= 0)
r++;
}
response.getOutputStream().write(("Read Input "+r+"\r\n").getBytes());
response.getOutputStream().write(("Read Input " + r + "\r\n").getBytes());
}
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -50,32 +49,33 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
private final Locker _locker = new Locker();
private final Runnable _runExecute = new RunExecute();
private final Producer _producer;
private boolean _idle=true;
private final ThreadPool _threadPool;
private boolean _idle = true;
private boolean _execute;
private boolean _producing;
private boolean _pending;
private final ThreadPool _threadpool;
private boolean _lowThreads;
public ExecuteProduceConsume(Producer producer, Executor executor)
{
super(executor);
this._producer = producer;
_threadpool = (executor instanceof ThreadPool)?((ThreadPool)executor):null;
_threadPool = executor instanceof ThreadPool ? (ThreadPool)executor : null;
}
@Deprecated
public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
{
this(producer,executor);
this(producer, executor);
}
@Override
public void execute()
{
if (LOG.isDebugEnabled())
LOG.debug("{} execute",this);
LOG.debug("{} execute", this);
boolean produce=false;
boolean produce = false;
try (Lock locked = _locker.lock())
{
// If we are idle and a thread is not producing
@ -85,15 +85,15 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
throw new IllegalStateException();
// Then this thread will do the producing
produce=_producing=true;
produce = _producing = true;
// and we are no longer idle
_idle=false;
_idle = false;
}
else
{
// Otherwise, lets tell the producing thread
// that it should call produce again before going idle
_execute=true;
_execute = true;
}
}
@ -105,14 +105,14 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
public void dispatch()
{
if (LOG.isDebugEnabled())
LOG.debug("{} spawning",this);
boolean dispatch=false;
LOG.debug("{} spawning", this);
boolean dispatch = false;
try (Lock locked = _locker.lock())
{
if (_idle)
dispatch=true;
dispatch = true;
else
_execute=true;
_execute = true;
}
if (dispatch)
execute(_runExecute);
@ -122,26 +122,36 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("{} run",this);
boolean produce=false;
LOG.debug("{} run", this);
boolean produce = false;
try (Lock locked = _locker.lock())
{
_pending=false;
_pending = false;
if (!_idle && !_producing)
{
produce=_producing=true;
produce = _producing = true;
}
}
if (produce)
{
// If we are low on threads, this could be the last thread, so we must not consume.
// So call produceExecuteConsume instead
if (_threadpool!=null && _threadpool.isLowOnThreads() && !produceExecuteConsume())
return;
produceConsume();
}
private void produceConsume()
{
if (_threadPool != null && _threadPool.isLowOnThreads())
{
// If we are low on threads we must not produce and consume
// in the same thread, but produce and execute to consume.
if (!produceExecuteConsume())
return;
}
executeProduceConsume();
}
public boolean isLowOnThreads()
{
return _lowThreads;
}
/**
@ -150,55 +160,70 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
private boolean produceExecuteConsume()
{
if (LOG.isDebugEnabled())
LOG.debug("{} Low Resources",this);
while (_threadpool.isLowOnThreads())
LOG.debug("{} enter low threads mode", this);
_lowThreads = true;
try
{
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", _producer, task);
if (task == null)
boolean idle = false;
while (_threadPool.isLowOnThreads())
{
// No task, so we are now idle
try (Lock locked = _locker.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("{} Idle Low Resources",this);
_producing=false;
_idle=false;
}
return false;
}
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", _producer, task);
// Execute the task.
execute(task);
if (task == null)
{
// No task, so we are now idle
try (Lock locked = _locker.lock())
{
if (_execute)
{
_execute = false;
_producing = true;
_idle = false;
continue;
}
_producing = false;
idle = _idle = true;
break;
}
}
// Execute the task.
execute(task);
}
return !idle;
}
finally
{
_lowThreads = false;
if (LOG.isDebugEnabled())
LOG.debug("{} exit low threads mode", this);
}
if (LOG.isDebugEnabled())
LOG.debug("{} No longer Low Resources",this);
return true;
}
private void produceConsume()
private void executeProduceConsume()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce enter",this);
LOG.debug("{} produce enter", this);
while (true)
{
// If we got here, then we are the thread that is producing.
if (LOG.isDebugEnabled())
LOG.debug("{} producing",this);
LOG.debug("{} producing", this);
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}",this,task);
LOG.debug("{} produced {}", this, task);
boolean dispatch=false;
boolean dispatch = false;
try (Lock locked = _locker.lock())
{
// Finished producing
_producing=false;
_producing = false;
// Did we produced a task?
if (task == null)
@ -207,14 +232,14 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
// Could another one just have been queued with an execute?
if (_execute)
{
_idle=false;
_producing=true;
_execute=false;
_idle = false;
_producing = true;
_execute = false;
continue;
}
// ... and no additional calls to execute, so we are idle
_idle=true;
_idle = true;
break;
}
@ -223,10 +248,10 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
if (!_pending)
{
// dispatch one
dispatch=_pending=true;
dispatch = _pending = true;
}
_execute=false;
_execute = false;
}
// If we became pending
@ -234,18 +259,18 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
{
// Spawn a new thread to continue production by running the produce loop.
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch",this);
LOG.debug("{} dispatch", this);
if (!execute(this))
task=null;
task = null;
}
// Run the task.
if (LOG.isDebugEnabled())
LOG.debug("{} run {}",this,task);
LOG.debug("{} run {}", this, task);
if (task != null)
task.run();
if (LOG.isDebugEnabled())
LOG.debug("{} ran {}",this,task);
LOG.debug("{} ran {}", this, task);
// Once we have run the task, we can try producing again.
try (Lock locked = _locker.lock())
@ -253,15 +278,14 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
// Is another thread already producing or we are now idle?
if (_producing || _idle)
break;
_producing=true;
_producing = true;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} produce exit",this);
LOG.debug("{} produce exit", this);
}
public Boolean isIdle()
{
try (Lock locked = _locker.lock())
@ -276,10 +300,10 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
builder.append("EPC ");
try (Lock locked = _locker.lock())
{
builder.append(_idle?"Idle/":"");
builder.append(_producing?"Prod/":"");
builder.append(_pending?"Pend/":"");
builder.append(_execute?"Exec/":"");
builder.append(_idle ? "Idle/" : "");
builder.append(_producing ? "Prod/" : "");
builder.append(_pending ? "Pend/" : "");
builder.append(_execute ? "Exec/" : "");
}
builder.append(_producer);
return builder.toString();

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Locker;
/**
* <p>A strategy where the caller thread iterates over task production, submitting each
@ -32,6 +33,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
private final Locker _locker = new Locker();
private final Producer _producer;
private final Executor _executor;
private State _state = State.IDLE;
@ -45,11 +47,19 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
@Override
public void execute()
{
synchronized (this)
try (Locker.Lock lock = _locker.lock())
{
_state = _state == State.IDLE ? State.PRODUCE : State.EXECUTE;
if (_state == State.EXECUTE)
return;
switch(_state)
{
case IDLE:
_state= State.PRODUCE;
break;
case PRODUCE:
case EXECUTE:
_state= State.EXECUTE;
return;
}
}
// Iterate until we are complete.
@ -62,12 +72,19 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
if (task == null)
{
synchronized (this)
try (Locker.Lock lock = _locker.lock())
{
_state = _state == State.PRODUCE ? State.IDLE : State.PRODUCE;
if (_state == State.PRODUCE)
continue;
return;
switch(_state)
{
case IDLE:
throw new IllegalStateException();
case PRODUCE:
_state= State.IDLE;
return;
case EXECUTE:
_state= State.PRODUCE;
continue;
}
}
}