better spruious wakeup handling and other simplifications

This commit is contained in:
Greg Wilkins 2017-03-21 09:23:53 +11:00
parent c01a910b69
commit 1a92015471
2 changed files with 76 additions and 59 deletions

View File

@ -420,11 +420,11 @@ public class BufferUtil
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Like append, but does not throw {@link BufferOverflowException} * Like append, but does not throw {@link BufferOverflowException}
* @param to Buffer is flush mode * @param to Buffer The buffer to fill to. The buffer will be flipped to fill mode and then flipped back to flush mode.
* @param b bytes to fill * @param b bytes The bytes to fill
* @param off offset into byte * @param off offset into bytes
* @param len length to fill * @param len length to fill
* @return The position of the valid data before the flipped position. * @return the number of bytes taken from the buffer.
*/ */
public static int fill(ByteBuffer to, byte[] b, int off, int len) public static int fill(ByteBuffer to, byte[] b, int off, int len)
{ {

View File

@ -147,41 +147,29 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
LOG.debug("{} run", this); LOG.debug("{} run", this);
if (!isRunning()) if (!isRunning())
return; return;
boolean producing; boolean producing = false;
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
_pendingProducersDispatched--; _pendingProducersDispatched--;
producing = pendingProducerWait(); _pendingProducers++;
}
if (producing) loop: while (isRunning())
produceConsume();
}
private boolean pendingProducerWait()
{
if (_pendingProducers<_pendingProducersMax)
{ {
try try
{ {
_pendingProducers++;
_produce.await(); _produce.await();
if (_pendingProducersSignalled==0) if (_pendingProducersSignalled==0)
{ {
// spurious wakeup! // spurious wakeup!
if (LOG.isDebugEnabled() && isRunning()) continue loop;
LOG.debug("{} SPURIOUS WAKEUP",this);
_pendingProducers--;
} }
else
{
_pendingProducersSignalled--; _pendingProducersSignalled--;
if (_state == State.IDLE) if (_state == State.IDLE)
{ {
_state = State.PRODUCING; _state = State.PRODUCING;
return true; producing = true;
}
} }
} }
catch (InterruptedException e) catch (InterruptedException e)
@ -189,33 +177,42 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
LOG.debug(e); LOG.debug(e);
_pendingProducers--; _pendingProducers--;
} }
break loop;
} }
return false; }
if (producing)
produceConsume();
} }
private void produceConsume() private void produceConsume()
{ {
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} produce enter", this); LOG.debug("{} produce {}", this,may_block_caller?"non-blocking":"blocking");
producing: while (isRunning()) producing: while (isRunning())
{ {
// If we got here, then we are the thread that is producing. // If we got here, then we are the thread that is producing.
if (LOG.isDebugEnabled())
LOG.debug("{} producing", this);
Runnable task = _producer.produce(); Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", this, task);
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
boolean new_pending_producer; boolean new_pending_producer;
boolean run_task_ourselves; boolean run_task_ourselves;
boolean keep_producing; boolean keep_producing;
StringBuilder state = null;
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
if (LOG.isDebugEnabled())
{
state = new StringBuilder();
getString(state);
getState(state);
state.append("->");
}
// Did we produced a task? // Did we produced a task?
if (task == null) if (task == null)
{ {
@ -261,9 +258,19 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
if (new_pending_producer) if (new_pending_producer)
_pendingProducersDispatched++; _pendingProducersDispatched++;
} }
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} mbc={} dnp={} run={} kp={}", this,may_block_caller,new_pending_producer,run_task_ourselves,keep_producing); getState(state);
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} {} {}",
state,
run_task_ourselves?(new_pending_producer?"EPC!":"PC"):"PEC",
task);
}
if (new_pending_producer) if (new_pending_producer)
// Spawn a new thread to continue production by running the produce loop. // Spawn a new thread to continue production by running the produce loop.
@ -316,13 +323,25 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
public String toString() public String toString()
{ {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
getString(builder);
try (Lock locked = _locker.lock())
{
getState(builder);
}
return builder.toString();
}
private void getString(StringBuilder builder)
{
builder.append(getClass().getSimpleName()); builder.append(getClass().getSimpleName());
builder.append('@'); builder.append('@');
builder.append(Integer.toHexString(hashCode())); builder.append(Integer.toHexString(hashCode()));
builder.append('/'); builder.append('/');
builder.append(_producer); builder.append(_producer);
builder.append('/'); builder.append('/');
try (Lock locked = _locker.lock()) }
private void getState(StringBuilder builder)
{ {
builder.append(_state); builder.append(_state);
builder.append('/'); builder.append('/');
@ -330,8 +349,6 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
builder.append('/'); builder.append('/');
builder.append(_pendingProducersMax); builder.append(_pendingProducersMax);
} }
return builder.toString();
}
private class RunProduce implements Runnable private class RunProduce implements Runnable
{ {