better spruious wakeup handling and other simplifications
This commit is contained in:
parent
c01a910b69
commit
1a92015471
|
@ -420,11 +420,11 @@ public class BufferUtil
|
|||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Like append, but does not throw {@link BufferOverflowException}
|
||||
* @param to Buffer is flush mode
|
||||
* @param b bytes to fill
|
||||
* @param off offset into byte
|
||||
* @param to Buffer The buffer to fill to. The buffer will be flipped to fill mode and then flipped back to flush mode.
|
||||
* @param b bytes The bytes to fill
|
||||
* @param off offset into bytes
|
||||
* @param len length to fill
|
||||
* @return The position of the valid data before the flipped position.
|
||||
* @return the number of bytes taken from the buffer.
|
||||
*/
|
||||
public static int fill(ByteBuffer to, byte[] b, int off, int len)
|
||||
{
|
||||
|
|
|
@ -147,75 +147,72 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
|
|||
LOG.debug("{} run", this);
|
||||
if (!isRunning())
|
||||
return;
|
||||
boolean producing;
|
||||
boolean producing = false;
|
||||
try (Lock locked = _locker.lock())
|
||||
{
|
||||
_pendingProducersDispatched--;
|
||||
producing = pendingProducerWait();
|
||||
_pendingProducers++;
|
||||
|
||||
loop: while (isRunning())
|
||||
{
|
||||
try
|
||||
{
|
||||
_produce.await();
|
||||
|
||||
if (_pendingProducersSignalled==0)
|
||||
{
|
||||
// spurious wakeup!
|
||||
continue loop;
|
||||
}
|
||||
|
||||
_pendingProducersSignalled--;
|
||||
if (_state == State.IDLE)
|
||||
{
|
||||
_state = State.PRODUCING;
|
||||
producing = true;
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
LOG.debug(e);
|
||||
_pendingProducers--;
|
||||
}
|
||||
|
||||
break loop;
|
||||
}
|
||||
}
|
||||
|
||||
if (producing)
|
||||
produceConsume();
|
||||
}
|
||||
|
||||
private boolean pendingProducerWait()
|
||||
{
|
||||
if (_pendingProducers<_pendingProducersMax)
|
||||
{
|
||||
try
|
||||
{
|
||||
_pendingProducers++;
|
||||
|
||||
_produce.await();
|
||||
if (_pendingProducersSignalled==0)
|
||||
{
|
||||
// spurious wakeup!
|
||||
if (LOG.isDebugEnabled() && isRunning())
|
||||
LOG.debug("{} SPURIOUS WAKEUP",this);
|
||||
_pendingProducers--;
|
||||
}
|
||||
else
|
||||
{
|
||||
_pendingProducersSignalled--;
|
||||
if (_state == State.IDLE)
|
||||
{
|
||||
_state = State.PRODUCING;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
LOG.debug(e);
|
||||
_pendingProducers--;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void produceConsume()
|
||||
{
|
||||
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} produce enter", this);
|
||||
LOG.debug("{} produce {}", this,may_block_caller?"non-blocking":"blocking");
|
||||
|
||||
producing: while (isRunning())
|
||||
{
|
||||
// If we got here, then we are the thread that is producing.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} producing", this);
|
||||
|
||||
Runnable task = _producer.produce();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} produced {}", this, task);
|
||||
|
||||
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
|
||||
boolean new_pending_producer;
|
||||
boolean run_task_ourselves;
|
||||
boolean keep_producing;
|
||||
|
||||
StringBuilder state = null;
|
||||
|
||||
try (Lock locked = _locker.lock())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
state = new StringBuilder();
|
||||
getString(state);
|
||||
getState(state);
|
||||
state.append("->");
|
||||
}
|
||||
|
||||
// Did we produced a task?
|
||||
if (task == null)
|
||||
{
|
||||
|
@ -261,9 +258,19 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
|
|||
if (new_pending_producer)
|
||||
_pendingProducersDispatched++;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
getState(state);
|
||||
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} mbc={} dnp={} run={} kp={}", this,may_block_caller,new_pending_producer,run_task_ourselves,keep_producing);
|
||||
{
|
||||
LOG.debug("{} {} {}",
|
||||
state,
|
||||
run_task_ourselves?(new_pending_producer?"EPC!":"PC"):"PEC",
|
||||
task);
|
||||
}
|
||||
|
||||
if (new_pending_producer)
|
||||
// Spawn a new thread to continue production by running the produce loop.
|
||||
|
@ -316,21 +323,31 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
|
|||
public String toString()
|
||||
{
|
||||
StringBuilder builder = new StringBuilder();
|
||||
getString(builder);
|
||||
try (Lock locked = _locker.lock())
|
||||
{
|
||||
getState(builder);
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private void getString(StringBuilder builder)
|
||||
{
|
||||
builder.append(getClass().getSimpleName());
|
||||
builder.append('@');
|
||||
builder.append(Integer.toHexString(hashCode()));
|
||||
builder.append('/');
|
||||
builder.append(_producer);
|
||||
builder.append('/');
|
||||
try (Lock locked = _locker.lock())
|
||||
{
|
||||
builder.append(_state);
|
||||
builder.append('/');
|
||||
builder.append(_pendingProducers);
|
||||
builder.append('/');
|
||||
builder.append(_pendingProducersMax);
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private void getState(StringBuilder builder)
|
||||
{
|
||||
builder.append(_state);
|
||||
builder.append('/');
|
||||
builder.append(_pendingProducers);
|
||||
builder.append('/');
|
||||
builder.append(_pendingProducersMax);
|
||||
}
|
||||
|
||||
private class RunProduce implements Runnable
|
||||
|
|
Loading…
Reference in New Issue