diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java index 799e772cb84..a61d9a5224a 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java @@ -53,7 +53,7 @@ public class HttpChannelOverFCGI extends HttpChannel public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport) { super(connector, configuration, endPoint, transport); - this.dispatcher = new Dispatcher(connector.getExecutor(), this); + this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this); } protected void header(HttpField field) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index be319f5c4a4..2c732124cce 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -94,21 +94,10 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor _state = new HttpChannelState(this); _request = new Request(this, newHttpInput(_state)); _response = new Response(this, newHttpOutput()); - - if (connector==null) - { - // Testing mode - _executor=null; - _requestLog=null; - } - else - { - Server server=_connector.getServer(); - _executor=server.getThreadPool(); - _requestLog=server.getRequestLog(); - } - - _requestLog=_connector==null?null:_connector.getServer().getRequestLog(); + + _executor = connector == null ? null : connector.getServer().getThreadPool(); + _requestLog = connector == null ? null : connector.getServer().getRequestLog(); + if (LOG.isDebugEnabled()) LOG.debug("new {} -> {},{},{}",this,_endPoint,_endPoint.getConnection(),_state); } @@ -316,7 +305,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor try { _request.setDispatcherType(DispatcherType.REQUEST); - + List customizers = _configuration.getCustomizers(); if (!customizers.isEmpty()) { @@ -605,7 +594,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor // to hopefull somebody that can handle throw new BadMessageException(status,reason); } - + try { if (action==Action.DISPATCH) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 1f8c8b4857f..4bb8f90db9a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Deque; import java.util.Objects; -import java.util.Queue; +import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import javax.servlet.ReadListener; @@ -119,7 +119,9 @@ public class HttpInput extends ServletInputStream implements Runnable private void wake() { - _channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel()); + HttpChannel channel = _channelState.getHttpChannel(); + Executor executor = channel.getConnector().getServer().getThreadPool(); + executor.execute(channel); } @@ -396,7 +398,7 @@ public class HttpInput extends ServletInputStream implements Runnable return woken; } - + /** * Adds some content to this input stream. * diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LowResourceMonitor.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LowResourceMonitor.java index e9fce86538f..ed71f1ae614 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LowResourceMonitor.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LowResourceMonitor.java @@ -37,7 +37,7 @@ import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.ThreadPool; -/** +/** * A monitor for low resources *

* An instance of this class will monitor all the connectors of a server (or a set of connectors @@ -254,11 +254,12 @@ public class LowResourceMonitor extends AbstractLifeCycle String reasons=null; String cause=""; int connections=0; - - if (_monitorThreads && _server.getThreadPool().isLowOnThreads()) + + ThreadPool serverThreads = _server.getThreadPool(); + if (_monitorThreads && serverThreads.isLowOnThreads()) { - reasons=low(reasons,"Low on threads: "+_server.getThreadPool()); - cause+="T"; + reasons=low(reasons,"Server low on threads: "+serverThreads); + cause+="S"; } for(Connector connector : getMonitoredOrServerConnectors()) @@ -266,12 +267,12 @@ public class LowResourceMonitor extends AbstractLifeCycle connections+=connector.getConnectedEndPoints().size(); Executor executor = connector.getExecutor(); - if (executor instanceof ThreadPool && executor!=_server.getThreadPool()) + if (executor instanceof ThreadPool && executor!=serverThreads) { - ThreadPool threadpool=(ThreadPool) executor; - if (_monitorThreads && threadpool.isLowOnThreads()) + ThreadPool connectorThreads=(ThreadPool)executor; + if (_monitorThreads && connectorThreads.isLowOnThreads()) { - reasons=low(reasons,"Low on threads: "+threadpool); + reasons=low(reasons,"Connector low on threads: "+connectorThreads); cause+="T"; } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java index 0971944df8b..debc55baf04 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java @@ -34,6 +34,7 @@ public class ProduceConsume implements ExecutionStrategy, Runnable private final Producer _producer; private final Executor _executor; + private State _state = State.IDLE; public ProduceConsume(Producer producer, Executor executor) { @@ -44,16 +45,31 @@ public class ProduceConsume implements ExecutionStrategy, Runnable @Override public void execute() { + synchronized (this) + { + _state = _state == State.IDLE ? State.PRODUCE : State.EXECUTE; + if (_state == State.EXECUTE) + return; + } + // Iterate until we are complete. while (true) { // Produce a task. Runnable task = _producer.produce(); if (LOG.isDebugEnabled()) - LOG.debug("{} produced {}", this, task); + LOG.debug("{} produced {}", _producer, task); if (task == null) - break; + { + synchronized (this) + { + _state = _state == State.PRODUCE ? State.IDLE : State.PRODUCE; + if (_state == State.PRODUCE) + continue; + return; + } + } // Run the task. task.run(); @@ -80,4 +96,9 @@ public class ProduceConsume implements ExecutionStrategy, Runnable return new ProduceConsume(producer, executor); } } + + private enum State + { + IDLE, PRODUCE, EXECUTE + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java index ab7ddc171b7..f9400d50d9f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java @@ -35,6 +35,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy private final Producer _producer; private final Executor _executor; + private State _state = State.IDLE; public ProduceExecuteConsume(Producer producer, Executor executor) { @@ -45,6 +46,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy @Override public void execute() { + synchronized (this) + { + _state = _state == State.IDLE ? State.PRODUCE : State.EXECUTE; + if (_state == State.EXECUTE) + return; + } + // Iterate until we are complete. while (true) { @@ -54,7 +62,15 @@ public class ProduceExecuteConsume implements ExecutionStrategy LOG.debug("{} produced {}", _producer, task); if (task == null) - break; + { + synchronized (this) + { + _state = _state == State.PRODUCE ? State.IDLE : State.PRODUCE; + if (_state == State.PRODUCE) + continue; + return; + } + } // Execute the task. try @@ -94,4 +110,9 @@ public class ProduceExecuteConsume implements ExecutionStrategy return new ProduceExecuteConsume(producer, executor); } } + + private enum State + { + IDLE, PRODUCE, EXECUTE + } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceRunTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java similarity index 78% rename from jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceRunTest.java rename to jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java index bfa15dd3164..af0c7d5671b 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceRunTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java @@ -19,10 +19,6 @@ package org.eclipse.jetty.util.thread.strategy; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -36,60 +32,46 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class ExecuteProduceRunTest +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +public class ExecuteProduceConsumeTest { - private final Runnable NULLTASK = new Runnable() - { - @Override - public void run() - { - } - }; - - - ExecuteProduceConsume _ewyk; - final BlockingQueue _produce = new BlockingArrayQueue<>(); - final Queue _executions = new ConcurrentArrayQueue<>(); - volatile Thread _producer; - + private static final Runnable NULLTASK = () -> {}; + + private final BlockingQueue _produce = new BlockingArrayQueue<>(); + private final Queue _executions = new ConcurrentArrayQueue<>(); + private ExecuteProduceConsume _ewyk; + private volatile Thread _producer; + @Before public void before() { _executions.clear(); - - Producer producer = new Producer() + + Producer producer = () -> { - @Override - public Runnable produce() + try { - try - { - _producer=Thread.currentThread(); - Runnable task= _produce.take(); - if (task==NULLTASK) - return null; - return task; - } - catch(InterruptedException e) - { - e.printStackTrace(); + _producer=Thread.currentThread(); + Runnable task= _produce.take(); + if (task==NULLTASK) return null; - } - finally - { - _producer=null; - } + return task; + } + catch(InterruptedException e) + { + e.printStackTrace(); + return null; + } + finally + { + _producer=null; } }; - Executor executor = new Executor() - { - @Override - public void execute(Runnable task) - { - _executions.add(task); - } - }; + Executor executor = _executions::add; _ewyk = new ExecuteProduceConsume(producer,executor); } @@ -101,14 +83,14 @@ public class ExecuteProduceRunTest assertThat(_produce.size(),equalTo(0)); assertThat(_executions.size(),equalTo(0)); } - + @Test public void testIdle() { _produce.add(NULLTASK); _ewyk.execute(); } - + @Test public void testProduceOneNonBlockingTask() { @@ -119,23 +101,22 @@ public class ExecuteProduceRunTest assertThat(t0.hasRun(),equalTo(true)); Assert.assertEquals(_ewyk,_executions.poll()); } - + @Test public void testProduceManyNonBlockingTask() { - Task[] t = new Task[10]; - for (int i=0;i