459081 - http2 push failures.
Reverted sense of runnable in ExecuteProduceRun so that test harness passes. Added RunExecute Runnable for the new dispatch semantic
This commit is contained in:
parent
1de53a888d
commit
c5541cb3bd
|
@ -45,7 +45,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(ExecuteProduceRun.class);
|
private static final Logger LOG = Log.getLogger(ExecuteProduceRun.class);
|
||||||
private final SpinLock _lock = new SpinLock();
|
private final SpinLock _lock = new SpinLock();
|
||||||
private final Runnable _resumer = new Resumer();
|
private final Runnable _runExecute = new RunExecute();
|
||||||
private final Producer _producer;
|
private final Producer _producer;
|
||||||
private final Executor _executor;
|
private final Executor _executor;
|
||||||
private boolean _idle=true;
|
private boolean _idle=true;
|
||||||
|
@ -104,16 +104,11 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
|
||||||
_execute=true;
|
_execute=true;
|
||||||
}
|
}
|
||||||
if (dispatch)
|
if (dispatch)
|
||||||
_executor.execute(this);
|
_executor.execute(_runExecute);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run()
|
public void run()
|
||||||
{
|
|
||||||
execute();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void resume()
|
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} run",this);
|
LOG.debug("{} run",this);
|
||||||
|
@ -187,7 +182,7 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
|
||||||
// Spawn a new thread to continue production by running the produce loop.
|
// Spawn a new thread to continue production by running the produce loop.
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} dispatch",this);
|
LOG.debug("{} dispatch",this);
|
||||||
_executor.execute(_resumer);
|
_executor.execute(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the task.
|
// Run the task.
|
||||||
|
@ -234,12 +229,12 @@ public class ExecuteProduceRun implements ExecutionStrategy, Runnable
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class Resumer implements Runnable
|
private class RunExecute implements Runnable
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
resume();
|
execute();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
package org.eclipse.jetty.util.thread.strategy;
|
package org.eclipse.jetty.util.thread.strategy;
|
||||||
|
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
|
@ -27,12 +27,10 @@ import java.util.Queue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||||
import org.eclipse.jetty.util.ConcurrentArrayQueue;
|
import org.eclipse.jetty.util.ConcurrentArrayQueue;
|
||||||
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
|
import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer;
|
||||||
import org.hamcrest.Matchers;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -100,8 +98,8 @@ public class ExecuteProduceRunTest
|
||||||
public void after()
|
public void after()
|
||||||
{
|
{
|
||||||
// All done and checked
|
// All done and checked
|
||||||
assertThat(_produce.size(),is(0));
|
assertThat(_produce.size(),equalTo(0));
|
||||||
assertThat(_executions.size(),is(0));
|
assertThat(_executions.size(),equalTo(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -118,7 +116,7 @@ public class ExecuteProduceRunTest
|
||||||
_produce.add(t0);
|
_produce.add(t0);
|
||||||
_produce.add(NULLTASK);
|
_produce.add(NULLTASK);
|
||||||
_ewyk.execute();
|
_ewyk.execute();
|
||||||
assertThat(t0.hasRun(),is(true));
|
assertThat(t0.hasRun(),equalTo(true));
|
||||||
Assert.assertEquals(_ewyk,_executions.poll());
|
Assert.assertEquals(_ewyk,_executions.poll());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +133,7 @@ public class ExecuteProduceRunTest
|
||||||
_ewyk.execute();
|
_ewyk.execute();
|
||||||
|
|
||||||
for (int i=0;i<t.length;i++)
|
for (int i=0;i<t.length;i++)
|
||||||
assertThat(t[i].hasRun(),is(true));
|
assertThat(t[i].hasRun(),equalTo(true));
|
||||||
Assert.assertEquals(_ewyk,_executions.poll());
|
Assert.assertEquals(_ewyk,_executions.poll());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -164,7 +162,7 @@ public class ExecuteProduceRunTest
|
||||||
assertEquals(_ewyk,_executions.poll());
|
assertEquals(_ewyk,_executions.poll());
|
||||||
// which is make us idle
|
// which is make us idle
|
||||||
_ewyk.run();
|
_ewyk.run();
|
||||||
assertThat(_ewyk.isIdle(),is(true));
|
assertThat(_ewyk.isIdle(),equalTo(true));
|
||||||
|
|
||||||
|
|
||||||
// unblock task
|
// unblock task
|
||||||
|
@ -199,7 +197,7 @@ public class ExecuteProduceRunTest
|
||||||
t0.unblock();
|
t0.unblock();
|
||||||
// will run to completion because are become idle
|
// will run to completion because are become idle
|
||||||
thread.join();
|
thread.join();
|
||||||
assertThat(_ewyk.isIdle(),is(true));
|
assertThat(_ewyk.isIdle(),equalTo(true));
|
||||||
|
|
||||||
// because we are idle, dispatched thread is noop
|
// because we are idle, dispatched thread is noop
|
||||||
_ewyk.run();
|
_ewyk.run();
|
||||||
|
@ -318,15 +316,15 @@ public class ExecuteProduceRunTest
|
||||||
final Task t1 = new Task(true);
|
final Task t1 = new Task(true);
|
||||||
_produce.add(t1);
|
_produce.add(t1);
|
||||||
t1.awaitRun();
|
t1.awaitRun();
|
||||||
assertThat(t1.getThread(),is(thread0));
|
assertThat(t1.getThread(),equalTo(thread0));
|
||||||
|
|
||||||
// Should NOT have dispatched another helper, because the last is still pending
|
// Should NOT have dispatched another helper, because the last is still pending
|
||||||
assertThat(_executions.size(),is(0));
|
assertThat(_executions.size(),equalTo(0));
|
||||||
|
|
||||||
// When the dispatched thread turns up, it will see the second idle
|
// When the dispatched thread turns up, it will see the second idle
|
||||||
_produce.add(NULLTASK);
|
_produce.add(NULLTASK);
|
||||||
_ewyk.run();
|
_ewyk.run();
|
||||||
assertThat(_ewyk.isIdle(),is(true));
|
assertThat(_ewyk.isIdle(),equalTo(true));
|
||||||
|
|
||||||
// So that when t1 completes it does not produce again.
|
// So that when t1 completes it does not produce again.
|
||||||
t1.unblock();
|
t1.unblock();
|
||||||
|
|
Loading…
Reference in New Issue