From 3b34423b17be8b7787ede0689b3bc94e0670ba22 Mon Sep 17 00:00:00 2001
From: Greg Wilkins
Date: Thu, 18 Dec 2014 17:06:32 +0100
Subject: [PATCH] EWYK SelectorManager
---
.../jetty/client/ssl/SslBytesServerTest.java | 2 +-
.../eclipse/jetty/io/AbstractConnection.java | 3 +-
.../org/eclipse/jetty/io/ManagedSelector.java | 417 +++++++++---------
.../org/eclipse/jetty/io/SelectorManager.java | 27 +-
.../SelectChannelEndPointInterestsTest.java | 2 +-
.../jetty/io/SelectChannelEndPointTest.java | 2 +-
.../eclipse/jetty/io/SelectorManagerTest.java | 2 +-
.../eclipse/jetty/io/SslConnectionTest.java | 2 +-
.../test/resources/jetty-logging.properties | 3 +
.../jetty/server/HttpConnectionFactory.java | 8 +-
.../jetty/server/ExtendedServerTest.java | 2 +-
.../SlowClientWithPipelinedRequestTest.java | 2 +-
.../server/proxy/ProxyHTTPSPDYConnection.java | 2 +-
.../jetty/util/thread/NonBlockingThread.java | 59 ---
14 files changed, 221 insertions(+), 312 deletions(-)
delete mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/thread/NonBlockingThread.java
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java
index 6442b2d75eb..4d813eff12a 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java
@@ -111,7 +111,7 @@ public class SslBytesServerTest extends SslBytesTest
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
- return configure(new HttpConnection(getHttpConfiguration(), connector, endPoint, true)
+ return configure(new HttpConnection(getHttpConfiguration(), connector, endPoint, false)
{
@Override
protected HttpParser newHttpParser()
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
index abaf1df7d27..b21bc833b43 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.util.thread.NonBlockingThread;
/**
* A convenience base implementation of {@link Connection}.
@@ -87,7 +86,7 @@ public abstract class AbstractConnection implements Connection
protected void failedCallback(final Callback callback, final Throwable x)
{
- boolean dispatchFailure = isDispatchIO() && NonBlockingThread.isNonBlockingThread();
+ boolean dispatchFailure = isDispatchIO();
if (dispatchFailure)
{
try
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index fbaea62b868..75a39732260 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -28,6 +28,7 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -40,6 +41,9 @@ import org.eclipse.jetty.io.SelectorManager.State;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
/**
@@ -48,22 +52,23 @@ import org.eclipse.jetty.util.thread.Scheduler;
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
* with the channel.
*/
-public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
+public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable, ExecutionStrategy.Producer
{
- /**
- *
- */
+ protected static final Logger LOG = Log.getLogger(ManagedSelector.class);
+ private final ExecutionStrategy _strategy;
private final SelectorManager _selectorManager;
private final AtomicReference _state= new AtomicReference<>(State.PROCESSING);
private List _runChanges = new ArrayList<>();
private List _addChanges = new ArrayList<>();
private final int _id;
private Selector _selector;
- volatile Thread _thread;
+ private Set _selectedKeys;
+ private Iterator _selections;
public ManagedSelector(SelectorManager selectorManager, int id)
{
_selectorManager = selectorManager;
+ _strategy = new ExecutionStrategy.Iterative(this,selectorManager.getExecutor());
_id = id;
setStopTimeout(5000);
}
@@ -84,13 +89,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
@Override
protected void doStop() throws Exception
{
- if (SelectorManager.LOG.isDebugEnabled())
- SelectorManager.LOG.debug("Stopping {}", this);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stopping {}", this);
Stop stop = new Stop();
submit(stop);
stop.await(getStopTimeout());
- if (SelectorManager.LOG.isDebugEnabled())
- SelectorManager.LOG.debug("Stopped {}", this);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stopped {}", this);
}
@@ -108,8 +113,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
// lead to stack overflows on a busy server, so we always offer the
// change to the queue and process the state.
- if (SelectorManager.LOG.isDebugEnabled())
- SelectorManager.LOG.debug("Queued change {}", change);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Queued change {}", change);
out: while (true)
{
@@ -151,216 +156,211 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
try
{
- if (SelectorManager.LOG.isDebugEnabled())
- SelectorManager.LOG.debug("Running change {}", change);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Running change {}", change);
change.run();
}
catch (Throwable x)
{
- SelectorManager.LOG.debug("Could not run change " + change, x);
+ LOG.debug("Could not run change " + change, x);
}
}
@Override
public void run()
{
- _thread = Thread.currentThread();
- String name = _thread.getName();
- int priority = _thread.getPriority();
- try
- {
- if (_selectorManager._priorityDelta != 0)
- _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _selectorManager._priorityDelta)));
-
- _thread.setName(String.format("%s-selector-%s@%h/%d", name, _selectorManager.getClass().getSimpleName(), _selectorManager.hashCode(), _id));
- if (SelectorManager.LOG.isDebugEnabled())
- SelectorManager.LOG.debug("Starting {} on {}", _thread, this);
- while (isRunning())
- select();
- while (isStopping())
- select();
- }
- finally
- {
- if (SelectorManager.LOG.isDebugEnabled())
- SelectorManager.LOG.debug("Stopped {} on {}", _thread, this);
- _thread.setName(name);
- if (_selectorManager._priorityDelta != 0)
- _thread.setPriority(priority);
- }
+ while (isRunning() || isStopping())
+ _strategy.produce();
}
- /**
- * Process changes and waits on {@link Selector#select()}.
- *
- * @see #submit(Runnable)
- */
- public void select()
+
+ @Override
+ public Runnable produce()
{
- boolean debug = SelectorManager.LOG.isDebugEnabled();
try
{
-
- // Run the changes, and only exit if we ran all changes
- loop: while(true)
+ while (isRunning()||isStopping())
{
- State state=_state.get();
- switch (state)
+ // Do we have a selections iterator
+ if (_selections==null || !_selections.hasNext())
{
- case PROCESSING:
- // We can loop on _runChanges list without lock, because only access here.
- int size = _runChanges.size();
- for (int i=0;i tmp=_runChanges;
- _runChanges=_addChanges;
- _addChanges=tmp;
- _state.set(State.PROCESSING);
- continue;
-
-
- case LOCKED:
- Thread.yield();
- continue;
-
- default:
- throw new IllegalStateException();
+ catch (CancelledKeyException x)
+ {
+ LOG.debug("Ignoring cancelled key for channel {}", key.channel());
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ catch (Throwable x)
+ {
+ LOG.warn("Could not process key for channel " + key.channel(), x);
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
+ Object attachment = key.attachment();
+ if (attachment instanceof EndPoint)
+ ((EndPoint)attachment).close();
+ }
}
}
-
- // Do the selecting!
- int selected;
- if (debug)
- {
- SelectorManager.LOG.debug("Selector loop waiting on select");
- selected = _selector.select();
- SelectorManager.LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
- }
- else
- selected = _selector.select();
-
- // We have finished selecting. This while loop could probably be replaced with just
- // _state.compareAndSet(State.SELECTING, State.PROCESSING)
- // since if state is locked by submit, the resulting state will be processing anyway.
- // but let's be thorough and do the full loop.
- out: while(true)
- {
- switch (_state.get())
- {
- case SELECTING:
- // we were still in selecting state, so probably have
- // selected a key, so goto processing state to handle
- if (_state.compareAndSet(State.SELECTING, State.PROCESSING))
- continue;
- break out;
- case PROCESSING:
- // we were already in processing, so were woken up by a change being
- // submitted, so no state change needed - lets just process
- break out;
- case LOCKED:
- // A change is currently being submitted. This does not matter
- // here so much, but we will spin anyway so we don't race it later nor
- // overwrite it's state change.
- Thread.yield();
- continue;
- default:
- throw new IllegalStateException();
- }
- }
-
- // Process any selected keys
- Set selectedKeys = _selector.selectedKeys();
- for (SelectionKey key : selectedKeys)
- {
- if (key.isValid())
- {
- processKey(key);
- }
- else
- {
- if (debug)
- SelectorManager.LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
- Object attachment = key.attachment();
- if (attachment instanceof EndPoint)
- ((EndPoint)attachment).close();
- }
- }
-
- // Allow any dispatched tasks to run.
- Thread.yield();
-
- // Update the keys. This is done separately to calling processKey, so that any momentary changes
- // to the key state do not have to be submitted, as they are frequently reverted by the dispatched
- // handling threads.
- for (SelectionKey key : selectedKeys)
- {
- if (key.isValid())
- updateKey(key);
- }
-
- selectedKeys.clear();
+ return null;
}
catch (Throwable x)
{
if (isRunning())
- SelectorManager.LOG.warn(x);
+ LOG.warn(x);
else
- SelectorManager.LOG.ignore(x);
+ LOG.ignore(x);
+ return null;
}
}
-
- private void processKey(SelectionKey key)
+
+ private void runChangesAndSetSelecting()
{
- final Object attachment = key.attachment();
- try
+
+ // Run the changes, and only exit if we ran all changes
+ loop: while(true)
{
- if (attachment instanceof SelectableEndPoint)
+ State state=_state.get();
+ switch (state)
{
- Runnable task=((SelectableEndPoint)attachment).onSelected();
- if (task!=null)
- _selectorManager.getExecutor().execute(task);
+ case PROCESSING:
+ // We can loop on _runChanges list without lock, because only access here.
+ int size = _runChanges.size();
+ for (int i=0;i tmp=_runChanges;
+ _runChanges=_addChanges;
+ _addChanges=tmp;
+ _state.set(State.PROCESSING);
+ continue;
+
+ case LOCKED:
+ Thread.yield();
+ continue;
+
+ default:
+ throw new IllegalStateException();
}
- else if (key.isConnectable())
- {
- processConnect(key, (Connect)attachment);
- }
- else if (key.isAcceptable())
- {
- processAccept(key);
- }
- else
- {
- throw new IllegalStateException();
- }
- }
- catch (CancelledKeyException x)
- {
- SelectorManager.LOG.debug("Ignoring cancelled key for channel {}", key.channel());
- if (attachment instanceof EndPoint)
- closeNoExceptions((EndPoint)attachment);
- }
- catch (Throwable x)
- {
- SelectorManager.LOG.warn("Could not process key for channel " + key.channel(), x);
- if (attachment instanceof EndPoint)
- closeNoExceptions((EndPoint)attachment);
}
}
+ private void selectAndSetProcessing() throws IOException
+ {
+ // Do the selecting!
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop waiting on select");
+ int selected = _selector.select();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
+
+ // We have finished selecting. This while loop could probably be replaced with just
+ // _state.compareAndSet(State.SELECTING, State.PROCESSING)
+ // since if state is locked by submit, the resulting state will be processing anyway.
+ // but let's be thorough and do the full loop.
+ out: while(true)
+ {
+ switch (_state.get())
+ {
+ case SELECTING:
+ // we were still in selecting state, so probably have
+ // selected a key, so goto processing state to handle
+ if (_state.compareAndSet(State.SELECTING, State.PROCESSING))
+ continue;
+ break out;
+ case PROCESSING:
+ // we were already in processing, so were woken up by a change being
+ // submitted, so no state change needed - lets just process
+ break out;
+ case LOCKED:
+ // A change is currently being submitted. This does not matter
+ // here so much, but we will spin anyway so we don't race it later nor
+ // overwrite it's state change.
+ Thread.yield();
+ continue;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ _selectedKeys = _selector.selectedKeys();
+ _selections = _selectedKeys.iterator();
+ }
+
+ @Override
+ public void onProductionComplete()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();
@@ -407,7 +407,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
catch (Throwable x)
{
closeNoExceptions(channel);
- SelectorManager.LOG.warn("Accept failed for channel " + channel, x);
+ LOG.warn("Accept failed for channel " + channel, x);
}
}
@@ -420,15 +420,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
catch (Throwable x)
{
- SelectorManager.LOG.ignore(x);
+ LOG.ignore(x);
}
}
- public boolean isSelectorThread()
- {
- return Thread.currentThread() == _thread;
- }
-
private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
{
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
@@ -436,15 +431,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setConnection(connection);
_selectorManager.connectionOpened(connection);
- if (SelectorManager.LOG.isDebugEnabled())
- SelectorManager.LOG.debug("Created {}", endPoint);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {}", endPoint);
return endPoint;
}
public void destroyEndPoint(EndPoint endPoint)
{
- if (SelectorManager.LOG.isDebugEnabled())
- SelectorManager.LOG.debug("Destroyed {}", endPoint);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Destroyed {}", endPoint);
Connection connection = endPoint.getConnection();
if (connection != null)
_selectorManager.connectionClosed(connection);
@@ -462,25 +457,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
{
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
- Thread selecting = _thread;
-
- Object where = "not selecting";
- StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
- if (trace != null)
- {
- for (StackTraceElement t : trace)
- if (t.getClassName().startsWith("org.eclipse.jetty."))
- {
- where = t;
- break;
- }
- }
-
Selector selector = _selector;
if (selector != null && selector.isOpen())
{
final ArrayList