From c81dcfc79059b6e0a3a0bfd22cb5b5633dd3291b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Mon, 1 Feb 2016 17:10:24 +0100 Subject: [PATCH] 486930 - Selector does not correctly handle rejected execution exception This fix work in two ways: 1) Both the PEC and EPC strategies when confronted with a RejectedExecutionException will continue to Produce rather than consume. 2) If a produced Runnable cannot be consumed and it supports the new Rejectable interface, then it's reject() method is called by the producer thread. Typically this is implemented to close the connection - with the risk being that the close might block, but that is probably better than leaking the connection? --- .../org/eclipse/jetty/io/ManagedSelector.java | 19 ++- .../jetty/io/SelectChannelEndPoint.java | 24 +++- .../jetty/io/SelectChannelEndPointTest.java | 116 ++++++++++++++++++ .../jetty/util/thread/ExecutionStrategy.java | 10 ++ .../strategy/ExecuteProduceConsume.java | 38 +++++- .../strategy/ProduceExecuteConsume.java | 22 +++- 6 files changed, 220 insertions(+), 9 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 546095cf700..0af682740f3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -44,6 +44,7 @@ import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; @@ -541,7 +542,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } - class Accept implements Runnable + class Accept implements Runnable, Rejectable { private final SocketChannel channel; private final Object attachment; @@ -552,6 +553,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump this.attachment = attachment; } + @Override + public void reject() + { + LOG.debug("rejected accept {}",channel); + closeNoExceptions(channel); + } + @Override public void run() { @@ -568,7 +576,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } - private class CreateEndPoint implements Product + private class CreateEndPoint implements Product, Rejectable { private final SocketChannel channel; private final SelectionKey key; @@ -593,6 +601,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } + @Override + public void reject() + { + LOG.debug("rejected create {}",channel); + closeNoExceptions(channel); + } + protected void failed(Throwable failure) { closeNoExceptions(channel); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index e7587bd1af0..435c81621a0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; @@ -67,7 +68,24 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel return SelectChannelEndPoint.this.toString()+":runUpdateKey"; } }; - private final Runnable _runFillable = new Runnable() + + private abstract class RejectableRunnable implements Runnable,Rejectable + { + @Override + public void reject() + { + try + { + close(); + } + catch (Throwable x) + { + LOG.warn(x); + } + } + } + + private final Runnable _runFillable = new RejectableRunnable() { @Override public void run() @@ -81,7 +99,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel return SelectChannelEndPoint.this.toString()+":runFillable"; } }; - private final Runnable _runCompleteWrite = new Runnable() + private final Runnable _runCompleteWrite = new RejectableRunnable() { @Override public void run() @@ -95,7 +113,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel return SelectChannelEndPoint.this.toString()+":runCompleteWrite"; } }; - private final Runnable _runFillableCompleteWrite = new Runnable() + private final Runnable _runFillableCompleteWrite = new RejectableRunnable() { @Override public void run() diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 08f1630ae19..de4a5d2eb7f 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -36,9 +36,12 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -48,6 +51,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.TimerScheduler; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -121,10 +125,18 @@ public class SelectChannelEndPointTest ByteBuffer _in = BufferUtil.allocate(32 * 1024); ByteBuffer _out = BufferUtil.allocate(32 * 1024); long _last = -1; + final CountDownLatch _latch; public TestConnection(EndPoint endp) { super(endp, _threadPool); + _latch=null; + } + + public TestConnection(EndPoint endp,CountDownLatch latch) + { + super(endp, _threadPool); + _latch=latch; } @Override @@ -150,6 +162,18 @@ public class SelectChannelEndPointTest @Override public void onFillable() { + if (_latch!=null) + { + try + { + _latch.await(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + Callback blocking = _blockingRead; if (blocking!=null) { @@ -668,4 +692,96 @@ public class SelectChannelEndPointTest } assertFalse(server.isOpen()); } + + + @Test + public void testRejectedExecution() throws Exception + { + _manager.stop(); + _threadPool.stop(); + + final CountDownLatch latch = new CountDownLatch(1); + + BlockingQueue q = new ArrayBlockingQueue<>(4); + _threadPool = new QueuedThreadPool(4,4,60000,q); + _manager = new SelectorManager(_threadPool, _scheduler, 1) + { + @Override + public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) + { + return new TestConnection(endpoint,latch); + } + + @Override + protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException + { + SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000); + _lastEndPoint = endp; + _lastEndPointLatch.countDown(); + return endp; + } + }; + + _threadPool.start(); + _manager.start(); + + AtomicInteger timeout = new AtomicInteger(); + AtomicInteger rejections = new AtomicInteger(); + AtomicInteger echoed = new AtomicInteger(); + + CountDownLatch closed = new CountDownLatch(10); + for (int i=0;i<10;i++) + { + new Thread() + { + public void run() + { + try(Socket client = newClient();) + { + client.setSoTimeout(5000); + + SocketChannel server = _connector.accept(); + server.configureBlocking(false); + + _manager.accept(server); + + // Write client to server + client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); + client.getOutputStream().flush(); + client.shutdownOutput(); + + // Verify echo server to client + for (char c : "HelloWorld".toCharArray()) + { + int b = client.getInputStream().read(); + assertTrue(b > 0); + assertEquals(c, (char)b); + } + assertEquals(-1,client.getInputStream().read()); + echoed.incrementAndGet(); + } + catch(SocketTimeoutException x) + { + x.printStackTrace(); + timeout.incrementAndGet(); + } + catch(Throwable x) + { + rejections.incrementAndGet(); + } + finally + { + closed.countDown(); + } + } + }.start(); + } + + latch.countDown(); + closed.await(); + Assert.assertThat(rejections.get(),Matchers.greaterThan(0)); + Assert.assertThat(rejections.get(),Matchers.lessThan(10)); + Assert.assertThat(timeout.get(),Matchers.equalTo(0)); + Assert.assertThat(echoed.get(),Matchers.equalTo(10-rejections.get())); + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java index eef09108928..31a17c82cb5 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.util.thread; import java.lang.reflect.Constructor; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.util.Loader; import org.eclipse.jetty.util.log.Log; @@ -53,6 +54,15 @@ public interface ExecutionStrategy */ public void execute(); + + /** + * A task that can handle {@link RejectedExecutionException} + */ + public interface Rejectable + { + public void reject(); + } + /** *

A producer of {@link Runnable} tasks to run.

*

The {@link ExecutionStrategy} will repeatedly invoke {@link #produce()} until diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java index d3053221b0b..c3436ad35d8 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.util.thread.strategy; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -140,7 +141,14 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable while (_threadpool!=null && _threadpool.isLowOnThreads()) { LOG.debug("EWYK low resources {}",this); - _lowresources.execute(); + try + { + _lowresources.execute(); + } + catch(Throwable e) + { + LOG.warn(e); + } } // no longer low resources so produceAndRun normally @@ -204,13 +212,37 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable // Spawn a new thread to continue production by running the produce loop. if (LOG.isDebugEnabled()) LOG.debug("{} dispatch",this); - _executor.execute(this); + try + { + _executor.execute(this); + } + catch(RejectedExecutionException e) + { + // If we cannot execute, the close or discard the task and keep producing + LOG.debug(e); + LOG.warn("RejectedExecution {}",task); + try + { + if (task instanceof Rejectable) + ((Rejectable)task).reject(); + } + catch (Exception x) + { + e.addSuppressed(x); + LOG.warn(e); + } + finally + { + task=null; + } + } } // Run the task. if (LOG.isDebugEnabled()) LOG.debug("{} run {}",this,task); - task.run(); + if (task != null) + task.run(); if (LOG.isDebugEnabled()) LOG.debug("{} ran {}",this,task); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java index 64903a6fbd2..6e7c95e5714 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.util.thread.strategy; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -55,7 +56,26 @@ public class ProduceExecuteConsume implements ExecutionStrategy break; // Execute the task. - _executor.execute(task); + try + { + _executor.execute(task); + } + catch(RejectedExecutionException e) + { + // Close or discard tasks that cannot be executed + if (task instanceof Rejectable) + { + try + { + ((Rejectable)task).reject(); + } + catch (Throwable x) + { + e.addSuppressed(x); + LOG.warn(e); + } + } + } } }