Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2016-05-04 11:47:54 +02:00
commit 4d4ecfd5cf
7 changed files with 150 additions and 136 deletions

View File

@ -53,7 +53,7 @@ public class HttpChannelOverFCGI extends HttpChannel
public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport)
{
super(connector, configuration, endPoint, transport);
this.dispatcher = new Dispatcher(connector.getExecutor(), this);
this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this);
}
protected void header(HttpField field)

View File

@ -94,21 +94,10 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_state = new HttpChannelState(this);
_request = new Request(this, newHttpInput(_state));
_response = new Response(this, newHttpOutput());
if (connector==null)
{
// Testing mode
_executor=null;
_requestLog=null;
}
else
{
Server server=_connector.getServer();
_executor=server.getThreadPool();
_requestLog=server.getRequestLog();
}
_requestLog=_connector==null?null:_connector.getServer().getRequestLog();
_executor = connector == null ? null : connector.getServer().getThreadPool();
_requestLog = connector == null ? null : connector.getServer().getRequestLog();
if (LOG.isDebugEnabled())
LOG.debug("new {} -> {},{},{}",this,_endPoint,_endPoint.getConnection(),_state);
}
@ -316,7 +305,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
try
{
_request.setDispatcherType(DispatcherType.REQUEST);
List<HttpConfiguration.Customizer> customizers = _configuration.getCustomizers();
if (!customizers.isEmpty())
{
@ -605,7 +594,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
// to hopefull somebody that can handle
throw new BadMessageException(status,reason);
}
try
{
if (action==Action.DISPATCH)

View File

@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.servlet.ReadListener;
@ -119,7 +119,9 @@ public class HttpInput extends ServletInputStream implements Runnable
private void wake()
{
_channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
HttpChannel channel = _channelState.getHttpChannel();
Executor executor = channel.getConnector().getServer().getThreadPool();
executor.execute(channel);
}
@ -396,7 +398,7 @@ public class HttpInput extends ServletInputStream implements Runnable
return woken;
}
/**
* Adds some content to this input stream.
*

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.ThreadPool;
/**
/**
* A monitor for low resources
* <p>
* An instance of this class will monitor all the connectors of a server (or a set of connectors
@ -254,11 +254,12 @@ public class LowResourceMonitor extends AbstractLifeCycle
String reasons=null;
String cause="";
int connections=0;
if (_monitorThreads && _server.getThreadPool().isLowOnThreads())
ThreadPool serverThreads = _server.getThreadPool();
if (_monitorThreads && serverThreads.isLowOnThreads())
{
reasons=low(reasons,"Low on threads: "+_server.getThreadPool());
cause+="T";
reasons=low(reasons,"Server low on threads: "+serverThreads);
cause+="S";
}
for(Connector connector : getMonitoredOrServerConnectors())
@ -266,12 +267,12 @@ public class LowResourceMonitor extends AbstractLifeCycle
connections+=connector.getConnectedEndPoints().size();
Executor executor = connector.getExecutor();
if (executor instanceof ThreadPool && executor!=_server.getThreadPool())
if (executor instanceof ThreadPool && executor!=serverThreads)
{
ThreadPool threadpool=(ThreadPool) executor;
if (_monitorThreads && threadpool.isLowOnThreads())
ThreadPool connectorThreads=(ThreadPool)executor;
if (_monitorThreads && connectorThreads.isLowOnThreads())
{
reasons=low(reasons,"Low on threads: "+threadpool);
reasons=low(reasons,"Connector low on threads: "+connectorThreads);
cause+="T";
}
}

View File

@ -34,6 +34,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
private final Producer _producer;
private final Executor _executor;
private State _state = State.IDLE;
public ProduceConsume(Producer producer, Executor executor)
{
@ -44,16 +45,31 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
@Override
public void execute()
{
synchronized (this)
{
_state = _state == State.IDLE ? State.PRODUCE : State.EXECUTE;
if (_state == State.EXECUTE)
return;
}
// Iterate until we are complete.
while (true)
{
// Produce a task.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", this, task);
LOG.debug("{} produced {}", _producer, task);
if (task == null)
break;
{
synchronized (this)
{
_state = _state == State.PRODUCE ? State.IDLE : State.PRODUCE;
if (_state == State.PRODUCE)
continue;
return;
}
}
// Run the task.
task.run();
@ -80,4 +96,9 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
return new ProduceConsume(producer, executor);
}
}
private enum State
{
IDLE, PRODUCE, EXECUTE
}
}

View File

@ -35,6 +35,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
private final Producer _producer;
private final Executor _executor;
private State _state = State.IDLE;
public ProduceExecuteConsume(Producer producer, Executor executor)
{
@ -45,6 +46,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy
@Override
public void execute()
{
synchronized (this)
{
_state = _state == State.IDLE ? State.PRODUCE : State.EXECUTE;
if (_state == State.EXECUTE)
return;
}
// Iterate until we are complete.
while (true)
{
@ -54,7 +62,15 @@ public class ProduceExecuteConsume implements ExecutionStrategy
LOG.debug("{} produced {}", _producer, task);
if (task == null)
break;
{
synchronized (this)
{
_state = _state == State.PRODUCE ? State.IDLE : State.PRODUCE;
if (_state == State.PRODUCE)
continue;
return;
}
}
// Execute the task.
try
@ -94,4 +110,9 @@ public class ProduceExecuteConsume implements ExecutionStrategy
return new ProduceExecuteConsume(producer, executor);
}
}
private enum State
{
IDLE, PRODUCE, EXECUTE
}
}

View File

@ -19,10 +19,6 @@
package org.eclipse.jetty.util.thread.strategy;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@ -36,60 +32,46 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ExecuteProduceRunTest
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class ExecuteProduceConsumeTest
{
private final Runnable NULLTASK = new Runnable()
{
@Override
public void run()
{
}
};
ExecuteProduceConsume _ewyk;
final BlockingQueue<Runnable> _produce = new BlockingArrayQueue<>();
final Queue<Runnable> _executions = new ConcurrentArrayQueue<>();
volatile Thread _producer;
private static final Runnable NULLTASK = () -> {};
private final BlockingQueue<Runnable> _produce = new BlockingArrayQueue<>();
private final Queue<Runnable> _executions = new ConcurrentArrayQueue<>();
private ExecuteProduceConsume _ewyk;
private volatile Thread _producer;
@Before
public void before()
{
_executions.clear();
Producer producer = new Producer()
Producer producer = () ->
{
@Override
public Runnable produce()
try
{
try
{
_producer=Thread.currentThread();
Runnable task= _produce.take();
if (task==NULLTASK)
return null;
return task;
}
catch(InterruptedException e)
{
e.printStackTrace();
_producer=Thread.currentThread();
Runnable task= _produce.take();
if (task==NULLTASK)
return null;
}
finally
{
_producer=null;
}
return task;
}
catch(InterruptedException e)
{
e.printStackTrace();
return null;
}
finally
{
_producer=null;
}
};
Executor executor = new Executor()
{
@Override
public void execute(Runnable task)
{
_executions.add(task);
}
};
Executor executor = _executions::add;
_ewyk = new ExecuteProduceConsume(producer,executor);
}
@ -101,14 +83,14 @@ public class ExecuteProduceRunTest
assertThat(_produce.size(),equalTo(0));
assertThat(_executions.size(),equalTo(0));
}
@Test
public void testIdle()
{
_produce.add(NULLTASK);
_ewyk.execute();
}
@Test
public void testProduceOneNonBlockingTask()
{
@ -119,23 +101,22 @@ public class ExecuteProduceRunTest
assertThat(t0.hasRun(),equalTo(true));
Assert.assertEquals(_ewyk,_executions.poll());
}
@Test
public void testProduceManyNonBlockingTask()
{
Task[] t = new Task[10];
for (int i=0;i<t.length;i++)
Task[] tasks = new Task[10];
for (int i=0;i<tasks.length;i++)
{
t[i]=new Task();
_produce.add(t[i]);
tasks[i]=new Task();
_produce.add(tasks[i]);
}
_produce.add(NULLTASK);
_ewyk.execute();
for (int i=0;i<t.length;i++)
assertThat(t[i].hasRun(),equalTo(true));
for (Task task : tasks)
assertThat(task.hasRun(), equalTo(true));
Assert.assertEquals(_ewyk,_executions.poll());
}
@Test
@ -153,18 +134,18 @@ public class ExecuteProduceRunTest
}
};
thread.start();
// wait for execute thread to block in
// wait for execute thread to block in
t0.awaitRun();
assertEquals(thread,t0.getThread());
// Should have dispatched only one helper
// Should have dispatched only one helper
assertEquals(_ewyk,_executions.poll());
// which is make us idle
_ewyk.run();
assertThat(_ewyk.isIdle(),equalTo(true));
// unblock task
t0.unblock();
// will run to completion because are already idle
@ -186,13 +167,13 @@ public class ExecuteProduceRunTest
}
};
thread.start();
// wait for execute thread to block in
// wait for execute thread to block in
t0.awaitRun();
// Should have dispatched only one helper
// Should have dispatched only one helper
Assert.assertEquals(_ewyk,_executions.poll());
// unblock task
t0.unblock();
// will run to completion because are become idle
@ -217,31 +198,31 @@ public class ExecuteProduceRunTest
}
};
thread0.start();
// wait for execute thread to block in task
t0.awaitRun();
assertEquals(thread0,t0.getThread());
// Should have dispatched another helper
// Should have dispatched another helper
Assert.assertEquals(_ewyk,_executions.poll());
// dispatched thread will block in produce
// dispatched thread will block in produce
Thread thread1 = new Thread(_ewyk);
thread1.start();
// Spin
while(_producer==null)
Thread.yield();
// thread1 is blocked in producing
assertEquals(thread1,_producer);
// because we are producing, any other dispatched threads are noops
_ewyk.run();
// ditto with execute
_ewyk.execute();
// Now if unblock the production by the dispatched thread
final Task t1 = new Task(true);
_produce.add(t1);
@ -249,28 +230,28 @@ public class ExecuteProduceRunTest
// task will be run by thread1
t1.awaitRun();
assertEquals(thread1,t1.getThread());
// and another thread will have been requested
Assert.assertEquals(_ewyk,_executions.poll());
// If we unblock t1, it will overtake t0 and try to produce again!
t1.unblock();
// Now thread1 is producing again
while(_producer==null)
Thread.yield();
assertEquals(thread1,_producer);
// If we unblock t0, it will decide it is not needed
t0.unblock();
thread0.join();
// If the requested extra thread turns up, it is also noop because we are producing
_ewyk.run();
// Give the idle job
_produce.add(NULLTASK);
// Which will eventually idle the producer
thread1.join();
assertEquals(null,_producer);
@ -290,29 +271,29 @@ public class ExecuteProduceRunTest
}
};
thread0.start();
// wait for execute thread to block in task
t0.awaitRun();
assertEquals(thread0,t0.getThread());
// Should have dispatched another helper
// Should have dispatched another helper
Assert.assertEquals(_ewyk,_executions.poll());
// We will go idle when we next produce
_produce.add(NULLTASK);
// execute will return immediately because it did not yet see the idle.
_ewyk.execute();
// When we unblock t0, thread1 will see the idle,
// When we unblock t0, thread1 will see the idle,
t0.unblock();
// but because there was a pending execute it will try producing again
while(_producer==null)
Thread.yield();
assertEquals(thread0,_producer);
// and will see new tasks
// and will see new tasks
final Task t1 = new Task(true);
_produce.add(t1);
t1.awaitRun();
@ -320,35 +301,34 @@ public class ExecuteProduceRunTest
// Should NOT have dispatched another helper, because the last is still pending
assertThat(_executions.size(),equalTo(0));
// When the dispatched thread turns up, it will see the second idle
_produce.add(NULLTASK);
_ewyk.run();
assertThat(_ewyk.isIdle(),equalTo(true));
// So that when t1 completes it does not produce again.
t1.unblock();
thread0.join();
}
public static class Task implements Runnable
private static class Task implements Runnable
{
final CountDownLatch _block = new CountDownLatch(1);
final CountDownLatch _run = new CountDownLatch(1);
volatile Thread _thread;
private final CountDownLatch _block = new CountDownLatch(1);
private final CountDownLatch _run = new CountDownLatch(1);
private volatile Thread _thread;
public Task()
{
this(false);
}
public Task(boolean block)
{
if (!block)
_block.countDown();
}
@Override
public void run()
{
@ -367,12 +347,12 @@ public class ExecuteProduceRunTest
_thread=null;
}
}
public boolean hasRun()
{
return _run.getCount()<=0;
}
public void awaitRun()
{
try
@ -384,12 +364,12 @@ public class ExecuteProduceRunTest
throw new IllegalStateException(e);
}
}
public void unblock()
{
_block.countDown();
}
public Thread getThread()
{
return _thread;