better spruious wakeup handling and other simplifications

This commit is contained in:
Greg Wilkins 2017-03-21 09:23:53 +11:00
parent c01a910b69
commit 1a92015471
2 changed files with 76 additions and 59 deletions

View File

@ -420,11 +420,11 @@ public class BufferUtil
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Like append, but does not throw {@link BufferOverflowException} * Like append, but does not throw {@link BufferOverflowException}
* @param to Buffer is flush mode * @param to Buffer The buffer to fill to. The buffer will be flipped to fill mode and then flipped back to flush mode.
* @param b bytes to fill * @param b bytes The bytes to fill
* @param off offset into byte * @param off offset into bytes
* @param len length to fill * @param len length to fill
* @return The position of the valid data before the flipped position. * @return the number of bytes taken from the buffer.
*/ */
public static int fill(ByteBuffer to, byte[] b, int off, int len) public static int fill(ByteBuffer to, byte[] b, int off, int len)
{ {

View File

@ -147,75 +147,72 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
LOG.debug("{} run", this); LOG.debug("{} run", this);
if (!isRunning()) if (!isRunning())
return; return;
boolean producing; boolean producing = false;
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
_pendingProducersDispatched--; _pendingProducersDispatched--;
producing = pendingProducerWait(); _pendingProducers++;
loop: while (isRunning())
{
try
{
_produce.await();
if (_pendingProducersSignalled==0)
{
// spurious wakeup!
continue loop;
}
_pendingProducersSignalled--;
if (_state == State.IDLE)
{
_state = State.PRODUCING;
producing = true;
}
}
catch (InterruptedException e)
{
LOG.debug(e);
_pendingProducers--;
}
break loop;
}
} }
if (producing) if (producing)
produceConsume(); produceConsume();
} }
private boolean pendingProducerWait()
{
if (_pendingProducers<_pendingProducersMax)
{
try
{
_pendingProducers++;
_produce.await();
if (_pendingProducersSignalled==0)
{
// spurious wakeup!
if (LOG.isDebugEnabled() && isRunning())
LOG.debug("{} SPURIOUS WAKEUP",this);
_pendingProducers--;
}
else
{
_pendingProducersSignalled--;
if (_state == State.IDLE)
{
_state = State.PRODUCING;
return true;
}
}
}
catch (InterruptedException e)
{
LOG.debug(e);
_pendingProducers--;
}
}
return false;
}
private void produceConsume() private void produceConsume()
{ {
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} produce enter", this); LOG.debug("{} produce {}", this,may_block_caller?"non-blocking":"blocking");
producing: while (isRunning()) producing: while (isRunning())
{ {
// If we got here, then we are the thread that is producing. // If we got here, then we are the thread that is producing.
if (LOG.isDebugEnabled())
LOG.debug("{} producing", this);
Runnable task = _producer.produce(); Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", this, task);
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
boolean new_pending_producer; boolean new_pending_producer;
boolean run_task_ourselves; boolean run_task_ourselves;
boolean keep_producing; boolean keep_producing;
StringBuilder state = null;
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
if (LOG.isDebugEnabled())
{
state = new StringBuilder();
getString(state);
getState(state);
state.append("->");
}
// Did we produced a task? // Did we produced a task?
if (task == null) if (task == null)
{ {
@ -261,9 +258,19 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
if (new_pending_producer) if (new_pending_producer)
_pendingProducersDispatched++; _pendingProducersDispatched++;
} }
if (LOG.isDebugEnabled())
getState(state);
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} mbc={} dnp={} run={} kp={}", this,may_block_caller,new_pending_producer,run_task_ourselves,keep_producing); {
LOG.debug("{} {} {}",
state,
run_task_ourselves?(new_pending_producer?"EPC!":"PC"):"PEC",
task);
}
if (new_pending_producer) if (new_pending_producer)
// Spawn a new thread to continue production by running the produce loop. // Spawn a new thread to continue production by running the produce loop.
@ -316,21 +323,31 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
public String toString() public String toString()
{ {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
getString(builder);
try (Lock locked = _locker.lock())
{
getState(builder);
}
return builder.toString();
}
private void getString(StringBuilder builder)
{
builder.append(getClass().getSimpleName()); builder.append(getClass().getSimpleName());
builder.append('@'); builder.append('@');
builder.append(Integer.toHexString(hashCode())); builder.append(Integer.toHexString(hashCode()));
builder.append('/'); builder.append('/');
builder.append(_producer); builder.append(_producer);
builder.append('/'); builder.append('/');
try (Lock locked = _locker.lock()) }
{
builder.append(_state); private void getState(StringBuilder builder)
builder.append('/'); {
builder.append(_pendingProducers); builder.append(_state);
builder.append('/'); builder.append('/');
builder.append(_pendingProducersMax); builder.append(_pendingProducers);
} builder.append('/');
return builder.toString(); builder.append(_pendingProducersMax);
} }
private class RunProduce implements Runnable private class RunProduce implements Runnable