better spruious wakeup handling and other simplifications
This commit is contained in:
parent
c01a910b69
commit
1a92015471
|
@ -420,11 +420,11 @@ public class BufferUtil
|
||||||
/* ------------------------------------------------------------ */
|
/* ------------------------------------------------------------ */
|
||||||
/**
|
/**
|
||||||
* Like append, but does not throw {@link BufferOverflowException}
|
* Like append, but does not throw {@link BufferOverflowException}
|
||||||
* @param to Buffer is flush mode
|
* @param to Buffer The buffer to fill to. The buffer will be flipped to fill mode and then flipped back to flush mode.
|
||||||
* @param b bytes to fill
|
* @param b bytes The bytes to fill
|
||||||
* @param off offset into byte
|
* @param off offset into bytes
|
||||||
* @param len length to fill
|
* @param len length to fill
|
||||||
* @return The position of the valid data before the flipped position.
|
* @return the number of bytes taken from the buffer.
|
||||||
*/
|
*/
|
||||||
public static int fill(ByteBuffer to, byte[] b, int off, int len)
|
public static int fill(ByteBuffer to, byte[] b, int off, int len)
|
||||||
{
|
{
|
||||||
|
|
|
@ -147,75 +147,72 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
|
||||||
LOG.debug("{} run", this);
|
LOG.debug("{} run", this);
|
||||||
if (!isRunning())
|
if (!isRunning())
|
||||||
return;
|
return;
|
||||||
boolean producing;
|
boolean producing = false;
|
||||||
try (Lock locked = _locker.lock())
|
try (Lock locked = _locker.lock())
|
||||||
{
|
{
|
||||||
_pendingProducersDispatched--;
|
_pendingProducersDispatched--;
|
||||||
producing = pendingProducerWait();
|
_pendingProducers++;
|
||||||
|
|
||||||
|
loop: while (isRunning())
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_produce.await();
|
||||||
|
|
||||||
|
if (_pendingProducersSignalled==0)
|
||||||
|
{
|
||||||
|
// spurious wakeup!
|
||||||
|
continue loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
_pendingProducersSignalled--;
|
||||||
|
if (_state == State.IDLE)
|
||||||
|
{
|
||||||
|
_state = State.PRODUCING;
|
||||||
|
producing = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
LOG.debug(e);
|
||||||
|
_pendingProducers--;
|
||||||
|
}
|
||||||
|
|
||||||
|
break loop;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (producing)
|
if (producing)
|
||||||
produceConsume();
|
produceConsume();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean pendingProducerWait()
|
|
||||||
{
|
|
||||||
if (_pendingProducers<_pendingProducersMax)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
_pendingProducers++;
|
|
||||||
|
|
||||||
_produce.await();
|
|
||||||
if (_pendingProducersSignalled==0)
|
|
||||||
{
|
|
||||||
// spurious wakeup!
|
|
||||||
if (LOG.isDebugEnabled() && isRunning())
|
|
||||||
LOG.debug("{} SPURIOUS WAKEUP",this);
|
|
||||||
_pendingProducers--;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
_pendingProducersSignalled--;
|
|
||||||
if (_state == State.IDLE)
|
|
||||||
{
|
|
||||||
_state = State.PRODUCING;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (InterruptedException e)
|
|
||||||
{
|
|
||||||
LOG.debug(e);
|
|
||||||
_pendingProducers--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void produceConsume()
|
private void produceConsume()
|
||||||
{
|
{
|
||||||
|
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} produce enter", this);
|
LOG.debug("{} produce {}", this,may_block_caller?"non-blocking":"blocking");
|
||||||
|
|
||||||
producing: while (isRunning())
|
producing: while (isRunning())
|
||||||
{
|
{
|
||||||
// If we got here, then we are the thread that is producing.
|
// If we got here, then we are the thread that is producing.
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("{} producing", this);
|
|
||||||
|
|
||||||
Runnable task = _producer.produce();
|
Runnable task = _producer.produce();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
|
||||||
LOG.debug("{} produced {}", this, task);
|
|
||||||
|
|
||||||
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
|
|
||||||
boolean new_pending_producer;
|
boolean new_pending_producer;
|
||||||
boolean run_task_ourselves;
|
boolean run_task_ourselves;
|
||||||
boolean keep_producing;
|
boolean keep_producing;
|
||||||
|
|
||||||
|
StringBuilder state = null;
|
||||||
|
|
||||||
try (Lock locked = _locker.lock())
|
try (Lock locked = _locker.lock())
|
||||||
{
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
{
|
||||||
|
state = new StringBuilder();
|
||||||
|
getString(state);
|
||||||
|
getState(state);
|
||||||
|
state.append("->");
|
||||||
|
}
|
||||||
|
|
||||||
// Did we produced a task?
|
// Did we produced a task?
|
||||||
if (task == null)
|
if (task == null)
|
||||||
{
|
{
|
||||||
|
@ -261,9 +258,19 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
|
||||||
if (new_pending_producer)
|
if (new_pending_producer)
|
||||||
_pendingProducersDispatched++;
|
_pendingProducersDispatched++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
getState(state);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} mbc={} dnp={} run={} kp={}", this,may_block_caller,new_pending_producer,run_task_ourselves,keep_producing);
|
{
|
||||||
|
LOG.debug("{} {} {}",
|
||||||
|
state,
|
||||||
|
run_task_ourselves?(new_pending_producer?"EPC!":"PC"):"PEC",
|
||||||
|
task);
|
||||||
|
}
|
||||||
|
|
||||||
if (new_pending_producer)
|
if (new_pending_producer)
|
||||||
// Spawn a new thread to continue production by running the produce loop.
|
// Spawn a new thread to continue production by running the produce loop.
|
||||||
|
@ -316,21 +323,31 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
|
getString(builder);
|
||||||
|
try (Lock locked = _locker.lock())
|
||||||
|
{
|
||||||
|
getState(builder);
|
||||||
|
}
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getString(StringBuilder builder)
|
||||||
|
{
|
||||||
builder.append(getClass().getSimpleName());
|
builder.append(getClass().getSimpleName());
|
||||||
builder.append('@');
|
builder.append('@');
|
||||||
builder.append(Integer.toHexString(hashCode()));
|
builder.append(Integer.toHexString(hashCode()));
|
||||||
builder.append('/');
|
builder.append('/');
|
||||||
builder.append(_producer);
|
builder.append(_producer);
|
||||||
builder.append('/');
|
builder.append('/');
|
||||||
try (Lock locked = _locker.lock())
|
}
|
||||||
{
|
|
||||||
builder.append(_state);
|
private void getState(StringBuilder builder)
|
||||||
builder.append('/');
|
{
|
||||||
builder.append(_pendingProducers);
|
builder.append(_state);
|
||||||
builder.append('/');
|
builder.append('/');
|
||||||
builder.append(_pendingProducersMax);
|
builder.append(_pendingProducers);
|
||||||
}
|
builder.append('/');
|
||||||
return builder.toString();
|
builder.append(_pendingProducersMax);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class RunProduce implements Runnable
|
private class RunProduce implements Runnable
|
||||||
|
|
Loading…
Reference in New Issue