Fixed push

This commit is contained in:
Greg Wilkins 2017-03-17 13:41:45 +11:00
parent d2d6bc3e65
commit c1d92ebde6
3 changed files with 33 additions and 51 deletions

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class HTTP2Connection extends AbstractConnection public class HTTP2Connection extends AbstractConnection
@ -148,7 +147,7 @@ public class HTTP2Connection extends AbstractConnection
protected void offerTask(Runnable task, boolean dispatch) protected void offerTask(Runnable task, boolean dispatch)
{ {
offerTask(task); offerTask(task);
strategy.produce(); strategy.dispatch();
} }
@Override @Override

View File

@ -86,9 +86,6 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
@Override @Override
public void produce() public void produce()
{ {
if (LOG.isDebugEnabled())
LOG.debug("{} execute", this);
boolean produce; boolean produce;
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
@ -109,6 +106,9 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
} }
} }
if (LOG.isDebugEnabled())
LOG.debug("{} execute {}", this, produce);
if (produce) if (produce)
produceConsume(); produceConsume();
} }
@ -116,8 +116,6 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
@Override @Override
public void dispatch() public void dispatch()
{ {
if (LOG.isDebugEnabled())
LOG.debug("{} spawning", this);
boolean dispatch = false; boolean dispatch = false;
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
@ -136,8 +134,10 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
dispatch = false; dispatch = false;
} }
} }
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch {}", this, dispatch);
if (dispatch) if (dispatch)
_executor.execute(_runProduce); _executor.execute(_runProduce,InvocationType.BLOCKING);
} }
@Override @Override
@ -170,6 +170,8 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
if (_pendingProducersSignalled==0) if (_pendingProducersSignalled==0)
{ {
// spurious wakeup! // spurious wakeup!
if (isRunning())
System.err.println("SPURIOUS!!!!!!!!!!!!!!!!!");
_pendingProducers--; _pendingProducers--;
} }
else else
@ -261,7 +263,7 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
} }
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} mbc={} dnp={} ei={} kp={}", this,may_block_caller,dispatch_new_producer,run_task_ourselves,keep_producing); LOG.debug("{} mbc={} dnp={} run={} kp={}", this,may_block_caller,dispatch_new_producer,run_task_ourselves,keep_producing);
if (dispatch_new_producer) if (dispatch_new_producer)
// Spawn a new thread to continue production by running the produce loop. // Spawn a new thread to continue production by running the produce loop.
@ -277,21 +279,12 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
if (keep_producing) if (keep_producing)
continue producing; continue producing;
if (may_block_caller) try (Lock locked = _locker.lock())
{ {
try (Lock locked = _locker.lock()) if (_state==State.IDLE)
{ {
switch(_state) _state = State.PRODUCING;
{ continue producing;
case IDLE:
_state = State.PRODUCING;
continue producing;
default:
// Perhaps we can be a pending Producer?
if (pendingProducerWait())
continue producing;
}
} }
} }
@ -314,6 +307,7 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
{ {
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
_pendingProducersSignalled=_pendingProducers+_pendingProducersDispatched;
_pendingProducers=0; _pendingProducers=0;
_produce.signalAll(); _produce.signalAll();
} }

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer; import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
import org.eclipse.jetty.util.thread.Invocable;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -55,7 +54,7 @@ public class ExecutionStrategyTest
}); });
} }
QueuedThreadPool threads = new QueuedThreadPool(20); QueuedThreadPool _threads = new QueuedThreadPool(20);
Class<? extends ExecutionStrategy> _strategyClass; Class<? extends ExecutionStrategy> _strategyClass;
ExecutionStrategy _strategy; ExecutionStrategy _strategy;
@ -73,14 +72,14 @@ public class ExecutionStrategyTest
@Before @Before
public void before() throws Exception public void before() throws Exception
{ {
threads.start(); _threads.start();
} }
@After @After
public void after() throws Exception public void after() throws Exception
{ {
LifeCycle.stop(_strategy); LifeCycle.stop(_strategy);
threads.stop(); _threads.stop();
} }
public static abstract class TestProducer implements Producer public static abstract class TestProducer implements Producer
@ -106,7 +105,7 @@ public class ExecutionStrategyTest
} }
}; };
newExecutionStrategy(producer,threads); newExecutionStrategy(producer,_threads);
_strategy.produce(); _strategy.produce();
assertThat(count.get(),greaterThan(0)); assertThat(count.get(),greaterThan(0));
} }
@ -114,7 +113,7 @@ public class ExecutionStrategyTest
@Test @Test
public void simpleTest() throws Exception public void simpleTest() throws Exception
{ {
final int TASKS = 3*threads.getMaxThreads(); final int TASKS = 3*_threads.getMaxThreads();
final CountDownLatch latch = new CountDownLatch(TASKS); final CountDownLatch latch = new CountDownLatch(TASKS);
Producer producer = new TestProducer() Producer producer = new TestProducer()
{ {
@ -138,14 +137,11 @@ public class ExecutionStrategyTest
} }
}; };
newExecutionStrategy(producer,threads); newExecutionStrategy(producer,_threads);
Invocable.invokeNonBlocking(()-> for (int p=0; latch.getCount()>0 && p<TASKS; p++)
{ _strategy.produce();
for (int p=0; latch.getCount()>0 && p<TASKS; p++)
_strategy.produce();
});
assertTrue(latch.await(10,TimeUnit.SECONDS)); assertTrue(latch.await(10,TimeUnit.SECONDS));
} }
@ -153,7 +149,7 @@ public class ExecutionStrategyTest
@Test @Test
public void blockingProducerTest() throws Exception public void blockingProducerTest() throws Exception
{ {
final int TASKS = 3*threads.getMaxThreads(); final int TASKS = 3*_threads.getMaxThreads();
final BlockingQueue<CountDownLatch> q = new ArrayBlockingQueue<>(500); final BlockingQueue<CountDownLatch> q = new ArrayBlockingQueue<>(500);
Producer producer = new TestProducer() Producer producer = new TestProducer()
@ -190,10 +186,13 @@ public class ExecutionStrategyTest
} }
}; };
newExecutionStrategy(producer,threads); newExecutionStrategy(producer,_threads);
_threads.execute(()->_strategy.produce());
final CountDownLatch latch = new CountDownLatch(TASKS); final CountDownLatch latch = new CountDownLatch(TASKS);
threads.execute(new Runnable() _threads.execute(new Runnable()
{ {
@Override @Override
public void run() public void run()
@ -204,6 +203,7 @@ public class ExecutionStrategyTest
{ {
Thread.sleep(20); Thread.sleep(20);
q.offer(latch); q.offer(latch);
_strategy.produce();
} }
} }
catch(Exception e) catch(Exception e)
@ -213,17 +213,6 @@ public class ExecutionStrategyTest
} }
}); });
Invocable.invokeNonBlocking(()->
{
for (int p=0; latch.getCount()>0 && p<TASKS; p++)
_strategy.produce();
});
assertTrue(latch.await(10,TimeUnit.SECONDS)); assertTrue(latch.await(10,TimeUnit.SECONDS));
} }
} }