460211 Fixed Idle race in ExecuteProduceRun

Reimplemented ExecuteProduceRun with a spin lock
This commit is contained in:
Greg Wilkins 2015-02-18 21:46:13 +11:00
parent d85662bd6e
commit e18573f4a3
14 changed files with 831 additions and 265 deletions

View File

@ -79,7 +79,7 @@ public class AbstractTest
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server(serverExecutor);
connector = new ServerConnector(server, connectionFactory);
connector = new ServerConnector(server, 1,1, connectionFactory);
server.addConnector(connector);
}

View File

@ -88,6 +88,8 @@ public class HTTP2Connection extends AbstractConnection implements Connection.Up
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 onFillable {} ", this);
executionStrategy.execute();
}

View File

@ -74,8 +74,12 @@ public abstract class FillInterest
public void fillable()
{
Callback callback = _interested.get();
if (LOG.isDebugEnabled())
LOG.debug("{} fillable {}",this,callback);
if (callback != null && _interested.compareAndSet(callback, null))
callback.succeeded();
else
LOG.debug("{} lost race {}",this,callback);
}
/**

View File

@ -304,19 +304,27 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param connection the connection just opened
*/
public void connectionOpened(Connection connection)
public void connectionOpened(final Connection connection)
{
try
// TODO remove this execution
getExecutor().execute(new Runnable()
{
connection.onOpen();
}
catch (Throwable x)
{
if (isRunning())
LOG.warn("Exception while notifying connection " + connection, x);
else
LOG.debug("Exception while notifying connection " + connection, x);
}
@Override
public void run()
{
try
{
connection.onOpen();
}
catch (Throwable x)
{
if (isRunning())
LOG.warn("Exception while notifying connection " + connection, x);
else
LOG.debug("Exception while notifying connection " + connection, x);
}
}
});
}
/**

View File

@ -30,11 +30,14 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* HttpServer Tester.
*/
@RunWith(AdvancedRunner.class)
public abstract class ConnectorCloseTestBase extends HttpServerTestFixture
{
private static String __content =

View File

@ -34,13 +34,16 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.IO;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public abstract class ConnectorTimeoutTest extends HttpServerTestFixture
{
@Rule

View File

@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.Exchanger;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
@ -40,13 +41,17 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceRun;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -61,6 +66,7 @@ import static org.junit.Assert.assertTrue;
/**
*
*/
@RunWith(AdvancedRunner.class)
public abstract class HttpServerTestBase extends HttpServerTestFixture
{
private static final String REQUEST1_HEADER = "POST / HTTP/1.0\n" + "Host: localhost\n" + "Content-Type: text/xml; charset=utf-8\n" + "Connection: close\n" + "Content-Length: ";

View File

@ -35,10 +35,12 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.PropertyFlag;
import org.eclipse.jetty.util.IO;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
public class HttpServerTestFixture
{ // Useful constants

View File

@ -20,11 +20,11 @@ package org.eclipse.jetty.util.thread;
import java.lang.reflect.Constructor;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Loader;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceRun;
/**
* <p>An {@link ExecutionStrategy} executes {@link Runnable} tasks produced by a {@link Producer}.
@ -84,256 +84,4 @@ public interface ExecutionStrategy
return new ExecuteProduceRun(producer,executor);
}
}
/**
* <p>A strategy where the caller thread iterates over task production, submitting each
* task to an {@link Executor} for execution.</p>
*/
public static class ProduceRun implements ExecutionStrategy
{
private final Producer _producer;
public ProduceRun(Producer producer)
{
this._producer = producer;
}
@Override
public void execute()
{
// Iterate until we are complete.
while (true)
{
// Produce a task.
Runnable task = _producer.produce();
if (task == null)
break;
// run the task.
task.run();
}
}
}
/**
* <p>A strategy where the caller thread iterates over task production, submitting each
* task to an {@link Executor} for execution.</p>
*/
public static class ProduceExecuteRun implements ExecutionStrategy
{
private static final Logger LOG = Log.getLogger(ExecutionStrategy.class);
private final Producer _producer;
private final Executor _executor;
public ProduceExecuteRun(Producer producer, Executor executor)
{
this._producer = producer;
this._executor = executor;
}
@Override
public void execute()
{
// Iterate until we are complete.
while (true)
{
// Produce a task.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} PER produced {}",_producer,task);
if (task == null)
break;
// Execute the task.
_executor.execute(task);
}
}
}
/**
* <p>A strategy where the thread calls produce will always run the resulting task
* itself. The strategy may dispatches another thread to continue production.
* </p>
* <p>The strategy is also known by the nickname 'eat what you kill', which comes from
* the hunting ethic that says a person should not kill anything he or she does not
* plan on eating. In this case, the phrase is used to mean that a thread should
* not produce a task that it does not intend to run. By making producers run the
* task that they have just produced avoids execution delays and avoids parallel slow
* down by running the task in the same core, with good chances of having a hot CPU
* cache. It also avoids the creation of a queue of produced tasks that the system
* does not yet have capacity to consume, which can save memory and exert back
* pressure on producers.
* </p>
*/
public static class ExecuteProduceRun implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(ExecutionStrategy.class);
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
private final Producer _producer;
private final Executor _executor;
public ExecuteProduceRun(Producer producer, Executor executor)
{
this._producer = producer;
this._executor = executor;
}
@Override
public void execute()
{
while (true)
{
State state = _state.get();
switch (state)
{
case IDLE:
if (!_state.compareAndSet(state, State.PENDING))
continue;
run();
return;
default:
return;
}
}
}
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("{} EPR executed",_producer);
// A new thread has arrived, so clear the PENDING
// flag and try to set the PRODUCING flag.
if (!clearPendingTryProducing())
return;
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("{} EPR producing",_producer);
// If we got here, then we are the thread that is producing.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} EPR produced {}",_producer,task);
// If no task was produced...
if (task == null)
{
// ...and we are the thread that sets the IDLE flag,
// then production has stopped.
tryIdle();
return;
}
// We have produced, so clear the PRODUCING flag
// and try to set the PENDING flag.
if (clearProducingTryPending())
{
if (LOG.isDebugEnabled())
LOG.debug("{} EPR executed self",_producer);
// Spawn a new thread to continue production.
_executor.execute(this);
}
// Run the task.
task.run();
// Once we have run the task, we can try producing again.
if (!tryProducing())
return;
}
}
private boolean tryProducing()
{
while (true)
{
State state = _state.get();
switch (state)
{
case PENDING:
if (!_state.compareAndSet(state, State.PRODUCING_PENDING))
continue;
return true;
default:
return false;
}
}
}
private boolean clearProducingTryPending()
{
while (true)
{
State state = _state.get();
switch (state)
{
case PRODUCING:
if (!_state.compareAndSet(state, State.PENDING))
continue;
return true;
case PRODUCING_PENDING:
if (!_state.compareAndSet(state, State.PENDING))
continue;
return false;
default:
throw new IllegalStateException();
}
}
}
private boolean clearPendingTryProducing()
{
while (true)
{
State state = _state.get();
switch (state)
{
case IDLE:
return false;
case PENDING:
if (!_state.compareAndSet(state, State.PRODUCING))
continue;
return true;
case PRODUCING_PENDING:
if (!_state.compareAndSet(state, State.PRODUCING))
continue;
return false; // Another thread is already producing
case PRODUCING:
return false; // Another thread is already producing
}
}
}
private boolean tryIdle()
{
while (true)
{
State state = _state.get();
switch (state)
{
case PRODUCING:
case PRODUCING_PENDING:
if (!_state.compareAndSet(state, State.IDLE))
continue;
return true;
default:
return false;
}
}
}
private enum State
{
IDLE, PRODUCING, PENDING, PRODUCING_PENDING
}
}
}

View File

@ -0,0 +1,59 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.atomic.AtomicBoolean;
/* ------------------------------------------------------------ */
/**
* <pre>
* try(SpinLock.Lock lock = spinlock.lock())
* {
* // something very quick and non blocking
* }
* </pre>
*/
public class SpinLock
{
private final AtomicBoolean _lock = new AtomicBoolean(false);
private final Lock _unlock = new Lock();
public Lock lock()
{
while(true)
{
if (!_lock.compareAndSet(false,true))
{
continue;
}
return _unlock;
}
}
public class Lock implements AutoCloseable
{
@Override
public void close()
{
if (!_lock.compareAndSet(true,false))
throw new IllegalStateException();
}
}
}

View File

@ -0,0 +1,216 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.SpinLock;
import org.eclipse.jetty.util.thread.SpinLock.Lock;
/**
* <p>A strategy where the thread calls produce will always run the resulting task
* itself. The strategy may dispatches another thread to continue production.
* </p>
* <p>The strategy is also known by the nickname 'eat what you kill', which comes from
* the hunting ethic that says a person should not kill anything he or she does not
* plan on eating. In this case, the phrase is used to mean that a thread should
* not produce a task that it does not intend to run. By making producers run the
* task that they have just produced avoids execution delays and avoids parallel slow
* down by running the task in the same core, with good chances of having a hot CPU
* cache. It also avoids the creation of a queue of produced tasks that the system
* does not yet have capacity to consume, which can save memory and exert back
* pressure on producers.
* </p>
*/
public class ExecuteProduceRun implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(ExecuteProduceRun.class);
private final SpinLock _lock = new SpinLock();
private final Producer _producer;
private final Executor _executor;
private boolean _idle=true;
private boolean _execute;
private boolean _producing;
private boolean _pending;
public ExecuteProduceRun(Producer producer, Executor executor)
{
this._producer = producer;
this._executor = executor;
}
@Override
public void execute()
{
if (LOG.isDebugEnabled())
LOG.debug("{} execute",this);
boolean produce=false;
try (Lock locked = _lock.lock())
{
// If we are idle and a thread is not producing
if (_idle)
{
if (_producing)
throw new IllegalStateException();
// Then this thread will do the producing
produce=_producing=true;
// and we are no longer idle
_idle=false;
}
else
{
// Otherwise, lets tell the producing thread
// that it should call produce again before going idle
_execute=true;
}
}
if (produce)
produce();
}
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("{} run",this);
boolean produce=false;
try (Lock locked = _lock.lock())
{
_pending=false;
if (!_idle && !_producing)
{
produce=_producing=true;
}
}
if (produce)
produce();
}
private void produce()
{
if (LOG.isDebugEnabled())
LOG.debug("{} produce enter",this);
loop: while (true)
{
// If we got here, then we are the thread that is producing.
if (LOG.isDebugEnabled())
LOG.debug("{} producing",this);
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}",this,task);
boolean dispatch=false;
try (Lock locked = _lock.lock())
{
// Finished producing
_producing=false;
// Did we produced a task?
if (task == null)
{
// There is no task.
if (_execute)
{
_idle=false;
_producing=true;
_execute=false;
continue loop;
}
// ... and no additional calls to execute, so we are idle
_idle=true;
break loop;
}
// We have a task, which we will run ourselves,
// so if we don't have another thread pending
if (!_pending)
{
// dispatch one
dispatch=_pending=true;
}
_execute=false;
}
// If we became pending
if (dispatch)
{
// Spawn a new thread to continue production by running the produce loop.
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch",this);
_executor.execute(this);
}
// Run the task.
if (LOG.isDebugEnabled())
LOG.debug("{} run {}",this,task);
task.run();
if (LOG.isDebugEnabled())
LOG.debug("{} ran {}",this,task);
// Once we have run the task, we can try producing again.
try (Lock locked = _lock.lock())
{
// Is another thread already producing or we are now idle?
if (_producing || _idle)
break loop;
_producing=true;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} produce exit",this);
}
public Boolean isIdle()
{
try (Lock locked = _lock.lock())
{
return _idle;
}
}
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("EPR ");
builder.append(" ");
try (Lock locked = _lock.lock())
{
builder.append(_idle?"Idle/":"");
builder.append(_producing?"Prod/":"");
builder.append(_pending?"Pend/":"");
builder.append(_execute?"Exec/":"");
}
builder.append(_producer);
return builder.toString();
}
}

View File

@ -0,0 +1,61 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
/**
* <p>A strategy where the caller thread iterates over task production, submitting each
* task to an {@link Executor} for execution.</p>
*/
public class ProduceExecuteRun implements ExecutionStrategy
{
private static final Logger LOG = Log.getLogger(ExecutionStrategy.class);
private final Producer _producer;
private final Executor _executor;
public ProduceExecuteRun(Producer producer, Executor executor)
{
this._producer = producer;
this._executor = executor;
}
@Override
public void execute()
{
// Iterate until we are complete.
while (true)
{
// Produce a task.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} PER produced {}",_producer,task);
if (task == null)
break;
// Execute the task.
_executor.execute(task);
}
}
}

View File

@ -0,0 +1,54 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
/**
* <p>A strategy where the caller thread iterates over task production, submitting each
* task to an {@link Executor} for execution.</p>
*/
public class ProduceRun implements ExecutionStrategy
{
private final Producer _producer;
public ProduceRun(Producer producer)
{
this._producer = producer;
}
@Override
public void execute()
{
// Iterate until we are complete.
while (true)
{
// Produce a task.
Runnable task = _producer.produce();
if (task == null)
break;
// run the task.
task.run();
}
}
}

View File

@ -0,0 +1,400 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ExecuteProduceRunTest
{
private final Runnable NULLTASK = new Runnable()
{
@Override
public void run()
{
}
};
ExecuteProduceRun _ewyk;
final BlockingQueue<Runnable> _produce = new BlockingArrayQueue<>();
final Queue<Runnable> _executions = new ConcurrentArrayQueue<>();
volatile Thread _producer;
@Before
public void before()
{
_executions.clear();
Producer producer = new Producer()
{
@Override
public Runnable produce()
{
try
{
_producer=Thread.currentThread();
Runnable task= _produce.take();
if (task==NULLTASK)
return null;
return task;
}
catch(InterruptedException e)
{
e.printStackTrace();
return null;
}
finally
{
_producer=null;
}
}
};
Executor executor = new Executor()
{
@Override
public void execute(Runnable task)
{
_executions.add(task);
}
};
_ewyk = new ExecuteProduceRun(producer,executor);
}
@After
public void after()
{
// All done and checked
assertThat(_produce.size(),is(0));
assertThat(_executions.size(),is(0));
}
@Test
public void testIdle()
{
_produce.add(NULLTASK);
_ewyk.execute();
}
@Test
public void testProduceOneNonBlockingTask()
{
Task t0 = new Task();
_produce.add(t0);
_produce.add(NULLTASK);
_ewyk.execute();
assertThat(t0.hasRun(),is(true));
Assert.assertEquals(_ewyk,_executions.poll());
}
@Test
public void testProduceManyNonBlockingTask()
{
Task[] t = new Task[10];
for (int i=0;i<t.length;i++)
{
t[i]=new Task();
_produce.add(t[i]);
}
_produce.add(NULLTASK);
_ewyk.execute();
for (int i=0;i<t.length;i++)
assertThat(t[i].hasRun(),is(true));
Assert.assertEquals(_ewyk,_executions.poll());
}
@Test
public void testProduceOneBlockingTaskIdleByDispatch() throws Exception
{
final Task t0 = new Task(true);
Thread thread = new Thread()
{
@Override
public void run()
{
_produce.add(t0);
_produce.add(NULLTASK);
_ewyk.execute();
}
};
thread.start();
// wait for execute thread to block in
t0.awaitRun();
assertEquals(thread,t0.getThread());
// Should have dispatched only one helper
assertEquals(_ewyk,_executions.poll());
// which is make us idle
_ewyk.run();
assertThat(_ewyk.isIdle(),is(true));
// unblock task
t0.unblock();
// will run to completion because are already idle
thread.join();
}
@Test
public void testProduceOneBlockingTaskIdleByTask() throws Exception
{
final Task t0 = new Task(true);
Thread thread = new Thread()
{
@Override
public void run()
{
_produce.add(t0);
_produce.add(NULLTASK);
_ewyk.execute();
}
};
thread.start();
// wait for execute thread to block in
t0.awaitRun();
// Should have dispatched only one helper
Assert.assertEquals(_ewyk,_executions.poll());
// unblock task
t0.unblock();
// will run to completion because are become idle
thread.join();
assertThat(_ewyk.isIdle(),is(true));
// because we are idle, dispatched thread is noop
_ewyk.run();
}
@Test
public void testBlockedInProduce() throws Exception
{
final Task t0 = new Task(true);
Thread thread0 = new Thread()
{
@Override
public void run()
{
_produce.add(t0);
_ewyk.execute();
}
};
thread0.start();
// wait for execute thread to block in task
t0.awaitRun();
assertEquals(thread0,t0.getThread());
// Should have dispatched another helper
Assert.assertEquals(_ewyk,_executions.poll());
// dispatched thread will block in produce
Thread thread1 = new Thread(_ewyk);
thread1.start();
// Spin
while(_producer==null)
Thread.yield();
// thread1 is blocked in producing
assertEquals(thread1,_producer);
// because we are producing, any other dispatched threads are noops
_ewyk.run();
// ditto with execute
_ewyk.execute();
// Now if unblock the production by the dispatched thread
final Task t1 = new Task(true);
_produce.add(t1);
// task will be run by thread1
t1.awaitRun();
assertEquals(thread1,t1.getThread());
// and another thread will have been requested
Assert.assertEquals(_ewyk,_executions.poll());
// If we unblock t1, it will overtake t0 and try to produce again!
t1.unblock();
// Now thread1 is producing again
while(_producer==null)
Thread.yield();
assertEquals(thread1,_producer);
// If we unblock t0, it will decide it is not needed
t0.unblock();
thread0.join();
// If the requested extra thread turns up, it is also noop because we are producing
_ewyk.run();
// Give the idle job
_produce.add(NULLTASK);
// Which will eventually idle the producer
thread1.join();
assertEquals(null,_producer);
}
@Test
public void testExecuteWhileIdling() throws Exception
{
final Task t0 = new Task(true);
Thread thread0 = new Thread()
{
@Override
public void run()
{
_produce.add(t0);
_ewyk.execute();
}
};
thread0.start();
// wait for execute thread to block in task
t0.awaitRun();
assertEquals(thread0,t0.getThread());
// Should have dispatched another helper
Assert.assertEquals(_ewyk,_executions.poll());
// We will go idle when we next produce
_produce.add(NULLTASK);
// execute will return immediately because it did not yet see the idle.
_ewyk.execute();
// When we unblock t0, thread1 will see the idle,
t0.unblock();
// but because there was a pending execute it will try producing again
while(_producer==null)
Thread.yield();
assertEquals(thread0,_producer);
// and will see new tasks
final Task t1 = new Task(true);
_produce.add(t1);
t1.awaitRun();
assertThat(t1.getThread(),is(thread0));
// Should NOT have dispatched another helper, because the last is still pending
assertThat(_executions.size(),is(0));
// When the dispatched thread turns up, it will see the second idle
_produce.add(NULLTASK);
_ewyk.run();
assertThat(_ewyk.isIdle(),is(true));
// So that when t1 completes it does not produce again.
t1.unblock();
thread0.join();
}
public static class Task implements Runnable
{
final CountDownLatch _block = new CountDownLatch(1);
final CountDownLatch _run = new CountDownLatch(1);
volatile Thread _thread;
public Task()
{
this(false);
}
public Task(boolean block)
{
if (!block)
_block.countDown();
}
@Override
public void run()
{
try
{
_thread=Thread.currentThread();
_run.countDown();
_block.await();
}
catch (InterruptedException e)
{
throw new IllegalStateException(e);
}
finally
{
_thread=null;
}
}
public boolean hasRun()
{
return _run.getCount()<=0;
}
public void awaitRun()
{
try
{
_run.await();
}
catch (InterruptedException e)
{
throw new IllegalStateException(e);
}
}
public void unblock()
{
_block.countDown();
}
public Thread getThread()
{
return _thread;
}
}
}