486930 - Selector does not correctly handle rejected execution exception

This fix work in two ways:

1) Both the PEC and EPC strategies when confronted with a
RejectedExecutionException will continue to Produce rather than consume.

2) If a produced Runnable cannot be consumed and it supports the new Rejectable interface,
then it's reject() method is called by the producer thread.    Typically this is implemented
to close the connection - with the risk being that the close might block, but that is
probably better than leaking the connection?
This commit is contained in:
Greg Wilkins 2016-02-01 17:10:24 +01:00
parent 2902a13463
commit c81dcfc790
6 changed files with 220 additions and 9 deletions

View File

@ -44,6 +44,7 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
@ -541,7 +542,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
} }
} }
class Accept implements Runnable class Accept implements Runnable, Rejectable
{ {
private final SocketChannel channel; private final SocketChannel channel;
private final Object attachment; private final Object attachment;
@ -552,6 +553,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
this.attachment = attachment; this.attachment = attachment;
} }
@Override
public void reject()
{
LOG.debug("rejected accept {}",channel);
closeNoExceptions(channel);
}
@Override @Override
public void run() public void run()
{ {
@ -568,7 +576,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
} }
} }
private class CreateEndPoint implements Product private class CreateEndPoint implements Product, Rejectable
{ {
private final SocketChannel channel; private final SocketChannel channel;
private final SelectionKey key; private final SelectionKey key;
@ -593,6 +601,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
} }
} }
@Override
public void reject()
{
LOG.debug("rejected create {}",channel);
closeNoExceptions(channel);
}
protected void failed(Throwable failure) protected void failed(Throwable failure)
{ {
closeNoExceptions(channel); closeNoExceptions(channel);

View File

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
@ -67,7 +68,24 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
return SelectChannelEndPoint.this.toString()+":runUpdateKey"; return SelectChannelEndPoint.this.toString()+":runUpdateKey";
} }
}; };
private final Runnable _runFillable = new Runnable()
private abstract class RejectableRunnable implements Runnable,Rejectable
{
@Override
public void reject()
{
try
{
close();
}
catch (Throwable x)
{
LOG.warn(x);
}
}
}
private final Runnable _runFillable = new RejectableRunnable()
{ {
@Override @Override
public void run() public void run()
@ -81,7 +99,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
return SelectChannelEndPoint.this.toString()+":runFillable"; return SelectChannelEndPoint.this.toString()+":runFillable";
} }
}; };
private final Runnable _runCompleteWrite = new Runnable() private final Runnable _runCompleteWrite = new RejectableRunnable()
{ {
@Override @Override
public void run() public void run()
@ -95,7 +113,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
return SelectChannelEndPoint.this.toString()+":runCompleteWrite"; return SelectChannelEndPoint.this.toString()+":runCompleteWrite";
} }
}; };
private final Runnable _runFillableCompleteWrite = new Runnable() private final Runnable _runFillableCompleteWrite = new RejectableRunnable()
{ {
@Override @Override
public void run() public void run()

View File

@ -36,9 +36,12 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -48,6 +51,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.hamcrest.Matchers;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -121,10 +125,18 @@ public class SelectChannelEndPointTest
ByteBuffer _in = BufferUtil.allocate(32 * 1024); ByteBuffer _in = BufferUtil.allocate(32 * 1024);
ByteBuffer _out = BufferUtil.allocate(32 * 1024); ByteBuffer _out = BufferUtil.allocate(32 * 1024);
long _last = -1; long _last = -1;
final CountDownLatch _latch;
public TestConnection(EndPoint endp) public TestConnection(EndPoint endp)
{ {
super(endp, _threadPool); super(endp, _threadPool);
_latch=null;
}
public TestConnection(EndPoint endp,CountDownLatch latch)
{
super(endp, _threadPool);
_latch=latch;
} }
@Override @Override
@ -150,6 +162,18 @@ public class SelectChannelEndPointTest
@Override @Override
public void onFillable() public void onFillable()
{ {
if (_latch!=null)
{
try
{
_latch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
Callback blocking = _blockingRead; Callback blocking = _blockingRead;
if (blocking!=null) if (blocking!=null)
{ {
@ -668,4 +692,96 @@ public class SelectChannelEndPointTest
} }
assertFalse(server.isOpen()); assertFalse(server.isOpen());
} }
@Test
public void testRejectedExecution() throws Exception
{
_manager.stop();
_threadPool.stop();
final CountDownLatch latch = new CountDownLatch(1);
BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(4);
_threadPool = new QueuedThreadPool(4,4,60000,q);
_manager = new SelectorManager(_threadPool, _scheduler, 1)
{
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
{
return new TestConnection(endpoint,latch);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000);
_lastEndPoint = endp;
_lastEndPointLatch.countDown();
return endp;
}
};
_threadPool.start();
_manager.start();
AtomicInteger timeout = new AtomicInteger();
AtomicInteger rejections = new AtomicInteger();
AtomicInteger echoed = new AtomicInteger();
CountDownLatch closed = new CountDownLatch(10);
for (int i=0;i<10;i++)
{
new Thread()
{
public void run()
{
try(Socket client = newClient();)
{
client.setSoTimeout(5000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
client.getOutputStream().flush();
client.shutdownOutput();
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
assertEquals(-1,client.getInputStream().read());
echoed.incrementAndGet();
}
catch(SocketTimeoutException x)
{
x.printStackTrace();
timeout.incrementAndGet();
}
catch(Throwable x)
{
rejections.incrementAndGet();
}
finally
{
closed.countDown();
}
}
}.start();
}
latch.countDown();
closed.await();
Assert.assertThat(rejections.get(),Matchers.greaterThan(0));
Assert.assertThat(rejections.get(),Matchers.lessThan(10));
Assert.assertThat(timeout.get(),Matchers.equalTo(0));
Assert.assertThat(echoed.get(),Matchers.equalTo(10-rejections.get()));
}
} }

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.util.thread;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.Loader; import org.eclipse.jetty.util.Loader;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -53,6 +54,15 @@ public interface ExecutionStrategy
*/ */
public void execute(); public void execute();
/**
* A task that can handle {@link RejectedExecutionException}
*/
public interface Rejectable
{
public void reject();
}
/** /**
* <p>A producer of {@link Runnable} tasks to run.</p> * <p>A producer of {@link Runnable} tasks to run.</p>
* <p>The {@link ExecutionStrategy} will repeatedly invoke {@link #produce()} until * <p>The {@link ExecutionStrategy} will repeatedly invoke {@link #produce()} until

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.util.thread.strategy; package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -140,7 +141,14 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
while (_threadpool!=null && _threadpool.isLowOnThreads()) while (_threadpool!=null && _threadpool.isLowOnThreads())
{ {
LOG.debug("EWYK low resources {}",this); LOG.debug("EWYK low resources {}",this);
_lowresources.execute(); try
{
_lowresources.execute();
}
catch(Throwable e)
{
LOG.warn(e);
}
} }
// no longer low resources so produceAndRun normally // no longer low resources so produceAndRun normally
@ -204,13 +212,37 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// Spawn a new thread to continue production by running the produce loop. // Spawn a new thread to continue production by running the produce loop.
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} dispatch",this); LOG.debug("{} dispatch",this);
_executor.execute(this); try
{
_executor.execute(this);
}
catch(RejectedExecutionException e)
{
// If we cannot execute, the close or discard the task and keep producing
LOG.debug(e);
LOG.warn("RejectedExecution {}",task);
try
{
if (task instanceof Rejectable)
((Rejectable)task).reject();
}
catch (Exception x)
{
e.addSuppressed(x);
LOG.warn(e);
}
finally
{
task=null;
}
}
} }
// Run the task. // Run the task.
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} run {}",this,task); LOG.debug("{} run {}",this,task);
task.run(); if (task != null)
task.run();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} ran {}",this,task); LOG.debug("{} ran {}",this,task);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.util.thread.strategy; package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -55,7 +56,26 @@ public class ProduceExecuteConsume implements ExecutionStrategy
break; break;
// Execute the task. // Execute the task.
_executor.execute(task); try
{
_executor.execute(task);
}
catch(RejectedExecutionException e)
{
// Close or discard tasks that cannot be executed
if (task instanceof Rejectable)
{
try
{
((Rejectable)task).reject();
}
catch (Throwable x)
{
e.addSuppressed(x);
LOG.warn(e);
}
}
}
} }
} }