EWYK SelectorManager
This commit is contained in:
parent
31e06b5791
commit
3b34423b17
|
@ -111,7 +111,7 @@ public class SslBytesServerTest extends SslBytesTest
|
|||
@Override
|
||||
public Connection newConnection(Connector connector, EndPoint endPoint)
|
||||
{
|
||||
return configure(new HttpConnection(getHttpConfiguration(), connector, endPoint, true)
|
||||
return configure(new HttpConnection(getHttpConfiguration(), connector, endPoint, false)
|
||||
{
|
||||
@Override
|
||||
protected HttpParser newHttpParser()
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.NonBlockingThread;
|
||||
|
||||
/**
|
||||
* <p>A convenience base implementation of {@link Connection}.</p>
|
||||
|
@ -87,7 +86,7 @@ public abstract class AbstractConnection implements Connection
|
|||
|
||||
protected void failedCallback(final Callback callback, final Throwable x)
|
||||
{
|
||||
boolean dispatchFailure = isDispatchIO() && NonBlockingThread.isNonBlockingThread();
|
||||
boolean dispatchFailure = isDispatchIO();
|
||||
if (dispatchFailure)
|
||||
{
|
||||
try
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.nio.channels.Selector;
|
|||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -40,6 +41,9 @@ import org.eclipse.jetty.io.SelectorManager.State;
|
|||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
|
||||
/**
|
||||
|
@ -48,22 +52,23 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
|
||||
* with the channel.</p>
|
||||
*/
|
||||
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
|
||||
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable, ExecutionStrategy.Producer
|
||||
{
|
||||
/**
|
||||
*
|
||||
*/
|
||||
protected static final Logger LOG = Log.getLogger(ManagedSelector.class);
|
||||
private final ExecutionStrategy _strategy;
|
||||
private final SelectorManager _selectorManager;
|
||||
private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESSING);
|
||||
private List<Runnable> _runChanges = new ArrayList<>();
|
||||
private List<Runnable> _addChanges = new ArrayList<>();
|
||||
private final int _id;
|
||||
private Selector _selector;
|
||||
volatile Thread _thread;
|
||||
private Set<SelectionKey> _selectedKeys;
|
||||
private Iterator<SelectionKey> _selections;
|
||||
|
||||
public ManagedSelector(SelectorManager selectorManager, int id)
|
||||
{
|
||||
_selectorManager = selectorManager;
|
||||
_strategy = new ExecutionStrategy.Iterative(this,selectorManager.getExecutor());
|
||||
_id = id;
|
||||
setStopTimeout(5000);
|
||||
}
|
||||
|
@ -84,13 +89,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Stopping {}", this);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Stopping {}", this);
|
||||
Stop stop = new Stop();
|
||||
submit(stop);
|
||||
stop.await(getStopTimeout());
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Stopped {}", this);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Stopped {}", this);
|
||||
}
|
||||
|
||||
|
||||
|
@ -108,8 +113,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
// lead to stack overflows on a busy server, so we always offer the
|
||||
// change to the queue and process the state.
|
||||
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Queued change {}", change);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queued change {}", change);
|
||||
|
||||
out: while (true)
|
||||
{
|
||||
|
@ -151,216 +156,211 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
{
|
||||
try
|
||||
{
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Running change {}", change);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Running change {}", change);
|
||||
change.run();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
SelectorManager.LOG.debug("Could not run change " + change, x);
|
||||
LOG.debug("Could not run change " + change, x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
_thread = Thread.currentThread();
|
||||
String name = _thread.getName();
|
||||
int priority = _thread.getPriority();
|
||||
try
|
||||
{
|
||||
if (_selectorManager._priorityDelta != 0)
|
||||
_thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _selectorManager._priorityDelta)));
|
||||
|
||||
_thread.setName(String.format("%s-selector-%s@%h/%d", name, _selectorManager.getClass().getSimpleName(), _selectorManager.hashCode(), _id));
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Starting {} on {}", _thread, this);
|
||||
while (isRunning())
|
||||
select();
|
||||
while (isStopping())
|
||||
select();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Stopped {} on {}", _thread, this);
|
||||
_thread.setName(name);
|
||||
if (_selectorManager._priorityDelta != 0)
|
||||
_thread.setPriority(priority);
|
||||
}
|
||||
while (isRunning() || isStopping())
|
||||
_strategy.produce();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Process changes and waits on {@link Selector#select()}.</p>
|
||||
*
|
||||
* @see #submit(Runnable)
|
||||
*/
|
||||
public void select()
|
||||
|
||||
@Override
|
||||
public Runnable produce()
|
||||
{
|
||||
boolean debug = SelectorManager.LOG.isDebugEnabled();
|
||||
try
|
||||
{
|
||||
|
||||
// Run the changes, and only exit if we ran all changes
|
||||
loop: while(true)
|
||||
while (isRunning()||isStopping())
|
||||
{
|
||||
State state=_state.get();
|
||||
switch (state)
|
||||
// Do we have a selections iterator
|
||||
if (_selections==null || !_selections.hasNext())
|
||||
{
|
||||
case PROCESSING:
|
||||
// We can loop on _runChanges list without lock, because only access here.
|
||||
int size = _runChanges.size();
|
||||
for (int i=0;i<size;i++)
|
||||
runChange(_runChanges.get(i));
|
||||
_runChanges.clear();
|
||||
|
||||
// No, so let's select again
|
||||
|
||||
// Do we have new changes?
|
||||
if (!_state.compareAndSet(state, State.LOCKED))
|
||||
continue;
|
||||
if (_addChanges.isEmpty())
|
||||
|
||||
// Do we have selected Keys?
|
||||
if (_selectedKeys!=null)
|
||||
{
|
||||
// yes, then update those keys
|
||||
for (SelectionKey key : _selectedKeys)
|
||||
updateKey(key);
|
||||
_selectedKeys.clear();
|
||||
}
|
||||
|
||||
runChangesAndSetSelecting();
|
||||
|
||||
selectAndSetProcessing();
|
||||
|
||||
}
|
||||
|
||||
// Process any selected keys
|
||||
while (_selections.hasNext())
|
||||
{
|
||||
SelectionKey key = _selections.next();
|
||||
|
||||
if (key.isValid())
|
||||
{
|
||||
Object attachment = key.attachment();
|
||||
try
|
||||
{
|
||||
// No, so lets go selecting
|
||||
_state.set(State.SELECTING);
|
||||
break loop;
|
||||
if (attachment instanceof SelectableEndPoint)
|
||||
{
|
||||
// Try to produce a task
|
||||
Runnable task = ((SelectableEndPoint)attachment).onSelected();
|
||||
if (task!=null)
|
||||
return task;
|
||||
}
|
||||
else if (key.isConnectable())
|
||||
{
|
||||
processConnect(key, (Connect)attachment);
|
||||
}
|
||||
else if (key.isAcceptable())
|
||||
{
|
||||
processAccept(key);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
// We have changes, so switch add/run lists and go keep processing
|
||||
List<Runnable> tmp=_runChanges;
|
||||
_runChanges=_addChanges;
|
||||
_addChanges=tmp;
|
||||
_state.set(State.PROCESSING);
|
||||
continue;
|
||||
|
||||
|
||||
case LOCKED:
|
||||
Thread.yield();
|
||||
continue;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
catch (CancelledKeyException x)
|
||||
{
|
||||
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
|
||||
if (attachment instanceof EndPoint)
|
||||
closeNoExceptions((EndPoint)attachment);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.warn("Could not process key for channel " + key.channel(), x);
|
||||
if (attachment instanceof EndPoint)
|
||||
closeNoExceptions((EndPoint)attachment);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
|
||||
Object attachment = key.attachment();
|
||||
if (attachment instanceof EndPoint)
|
||||
((EndPoint)attachment).close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Do the selecting!
|
||||
int selected;
|
||||
if (debug)
|
||||
{
|
||||
SelectorManager.LOG.debug("Selector loop waiting on select");
|
||||
selected = _selector.select();
|
||||
SelectorManager.LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
|
||||
}
|
||||
else
|
||||
selected = _selector.select();
|
||||
|
||||
// We have finished selecting. This while loop could probably be replaced with just
|
||||
// _state.compareAndSet(State.SELECTING, State.PROCESSING)
|
||||
// since if state is locked by submit, the resulting state will be processing anyway.
|
||||
// but let's be thorough and do the full loop.
|
||||
out: while(true)
|
||||
{
|
||||
switch (_state.get())
|
||||
{
|
||||
case SELECTING:
|
||||
// we were still in selecting state, so probably have
|
||||
// selected a key, so goto processing state to handle
|
||||
if (_state.compareAndSet(State.SELECTING, State.PROCESSING))
|
||||
continue;
|
||||
break out;
|
||||
case PROCESSING:
|
||||
// we were already in processing, so were woken up by a change being
|
||||
// submitted, so no state change needed - lets just process
|
||||
break out;
|
||||
case LOCKED:
|
||||
// A change is currently being submitted. This does not matter
|
||||
// here so much, but we will spin anyway so we don't race it later nor
|
||||
// overwrite it's state change.
|
||||
Thread.yield();
|
||||
continue;
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
// Process any selected keys
|
||||
Set<SelectionKey> selectedKeys = _selector.selectedKeys();
|
||||
for (SelectionKey key : selectedKeys)
|
||||
{
|
||||
if (key.isValid())
|
||||
{
|
||||
processKey(key);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (debug)
|
||||
SelectorManager.LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
|
||||
Object attachment = key.attachment();
|
||||
if (attachment instanceof EndPoint)
|
||||
((EndPoint)attachment).close();
|
||||
}
|
||||
}
|
||||
|
||||
// Allow any dispatched tasks to run.
|
||||
Thread.yield();
|
||||
|
||||
// Update the keys. This is done separately to calling processKey, so that any momentary changes
|
||||
// to the key state do not have to be submitted, as they are frequently reverted by the dispatched
|
||||
// handling threads.
|
||||
for (SelectionKey key : selectedKeys)
|
||||
{
|
||||
if (key.isValid())
|
||||
updateKey(key);
|
||||
}
|
||||
|
||||
selectedKeys.clear();
|
||||
return null;
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (isRunning())
|
||||
SelectorManager.LOG.warn(x);
|
||||
LOG.warn(x);
|
||||
else
|
||||
SelectorManager.LOG.ignore(x);
|
||||
LOG.ignore(x);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void processKey(SelectionKey key)
|
||||
|
||||
private void runChangesAndSetSelecting()
|
||||
{
|
||||
final Object attachment = key.attachment();
|
||||
try
|
||||
|
||||
// Run the changes, and only exit if we ran all changes
|
||||
loop: while(true)
|
||||
{
|
||||
if (attachment instanceof SelectableEndPoint)
|
||||
State state=_state.get();
|
||||
switch (state)
|
||||
{
|
||||
Runnable task=((SelectableEndPoint)attachment).onSelected();
|
||||
if (task!=null)
|
||||
_selectorManager.getExecutor().execute(task);
|
||||
case PROCESSING:
|
||||
// We can loop on _runChanges list without lock, because only access here.
|
||||
int size = _runChanges.size();
|
||||
for (int i=0;i<size;i++)
|
||||
runChange(_runChanges.get(i));
|
||||
_runChanges.clear();
|
||||
|
||||
|
||||
// Do we have new changes?
|
||||
if (!_state.compareAndSet(state, State.LOCKED))
|
||||
continue;
|
||||
if (_addChanges.isEmpty())
|
||||
{
|
||||
// No, so lets go selecting
|
||||
_state.set(State.SELECTING);
|
||||
break loop;
|
||||
}
|
||||
|
||||
// We have changes, so switch add/run lists and go keep processing
|
||||
List<Runnable> tmp=_runChanges;
|
||||
_runChanges=_addChanges;
|
||||
_addChanges=tmp;
|
||||
_state.set(State.PROCESSING);
|
||||
continue;
|
||||
|
||||
case LOCKED:
|
||||
Thread.yield();
|
||||
continue;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
else if (key.isConnectable())
|
||||
{
|
||||
processConnect(key, (Connect)attachment);
|
||||
}
|
||||
else if (key.isAcceptable())
|
||||
{
|
||||
processAccept(key);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
catch (CancelledKeyException x)
|
||||
{
|
||||
SelectorManager.LOG.debug("Ignoring cancelled key for channel {}", key.channel());
|
||||
if (attachment instanceof EndPoint)
|
||||
closeNoExceptions((EndPoint)attachment);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
SelectorManager.LOG.warn("Could not process key for channel " + key.channel(), x);
|
||||
if (attachment instanceof EndPoint)
|
||||
closeNoExceptions((EndPoint)attachment);
|
||||
}
|
||||
}
|
||||
|
||||
private void selectAndSetProcessing() throws IOException
|
||||
{
|
||||
// Do the selecting!
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Selector loop waiting on select");
|
||||
int selected = _selector.select();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
|
||||
|
||||
// We have finished selecting. This while loop could probably be replaced with just
|
||||
// _state.compareAndSet(State.SELECTING, State.PROCESSING)
|
||||
// since if state is locked by submit, the resulting state will be processing anyway.
|
||||
// but let's be thorough and do the full loop.
|
||||
out: while(true)
|
||||
{
|
||||
switch (_state.get())
|
||||
{
|
||||
case SELECTING:
|
||||
// we were still in selecting state, so probably have
|
||||
// selected a key, so goto processing state to handle
|
||||
if (_state.compareAndSet(State.SELECTING, State.PROCESSING))
|
||||
continue;
|
||||
break out;
|
||||
case PROCESSING:
|
||||
// we were already in processing, so were woken up by a change being
|
||||
// submitted, so no state change needed - lets just process
|
||||
break out;
|
||||
case LOCKED:
|
||||
// A change is currently being submitted. This does not matter
|
||||
// here so much, but we will spin anyway so we don't race it later nor
|
||||
// overwrite it's state change.
|
||||
Thread.yield();
|
||||
continue;
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
_selectedKeys = _selector.selectedKeys();
|
||||
_selections = _selectedKeys.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProductionComplete()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void updateKey(SelectionKey key)
|
||||
{
|
||||
Object attachment = key.attachment();
|
||||
|
@ -407,7 +407,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
catch (Throwable x)
|
||||
{
|
||||
closeNoExceptions(channel);
|
||||
SelectorManager.LOG.warn("Accept failed for channel " + channel, x);
|
||||
LOG.warn("Accept failed for channel " + channel, x);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -420,15 +420,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
SelectorManager.LOG.ignore(x);
|
||||
LOG.ignore(x);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSelectorThread()
|
||||
{
|
||||
return Thread.currentThread() == _thread;
|
||||
}
|
||||
|
||||
private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
|
||||
{
|
||||
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
|
||||
|
@ -436,15 +431,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
|
||||
endPoint.setConnection(connection);
|
||||
_selectorManager.connectionOpened(connection);
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Created {}", endPoint);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Created {}", endPoint);
|
||||
return endPoint;
|
||||
}
|
||||
|
||||
public void destroyEndPoint(EndPoint endPoint)
|
||||
{
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Destroyed {}", endPoint);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Destroyed {}", endPoint);
|
||||
Connection connection = endPoint.getConnection();
|
||||
if (connection != null)
|
||||
_selectorManager.connectionClosed(connection);
|
||||
|
@ -462,25 +457,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
{
|
||||
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
|
||||
|
||||
Thread selecting = _thread;
|
||||
|
||||
Object where = "not selecting";
|
||||
StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
|
||||
if (trace != null)
|
||||
{
|
||||
for (StackTraceElement t : trace)
|
||||
if (t.getClassName().startsWith("org.eclipse.jetty."))
|
||||
{
|
||||
where = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Selector selector = _selector;
|
||||
if (selector != null && selector.isOpen())
|
||||
{
|
||||
final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
|
||||
dump.add(where);
|
||||
|
||||
DumpKeys dumpKeys = new DumpKeys(dump);
|
||||
submit(dumpKeys);
|
||||
|
@ -504,6 +484,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -559,13 +541,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
try
|
||||
{
|
||||
SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("{} acceptor={}", this, key);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} acceptor={}", this, key);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
closeNoExceptions(_channel);
|
||||
SelectorManager.LOG.warn(x);
|
||||
LOG.warn(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -593,7 +575,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
catch (Throwable x)
|
||||
{
|
||||
closeNoExceptions(channel);
|
||||
SelectorManager.LOG.debug(x);
|
||||
LOG.debug(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -651,8 +633,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
SocketChannel channel = connect.channel;
|
||||
if (channel.isConnectionPending())
|
||||
{
|
||||
if (SelectorManager.LOG.isDebugEnabled())
|
||||
SelectorManager.LOG.debug("Channel {} timed out while connecting, closing it", channel);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Channel {} timed out while connecting, closing it", channel);
|
||||
connect.failed(new SocketTimeoutException());
|
||||
}
|
||||
}
|
||||
|
@ -738,4 +720,5 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -24,11 +24,8 @@ import java.net.SocketAddress;
|
|||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.util.ArrayQueue;
|
||||
import org.eclipse.jetty.util.ConcurrentArrayQueue;
|
||||
import org.eclipse.jetty.util.TypeUtil;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
|
@ -36,7 +33,6 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
|||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.NonBlockingThread;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
|
||||
/**
|
||||
|
@ -54,7 +50,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
private final Scheduler scheduler;
|
||||
private final ManagedSelector[] _selectors;
|
||||
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
||||
int _priorityDelta;
|
||||
private long _selectorIndex;
|
||||
|
||||
protected SelectorManager(Executor executor, Scheduler scheduler)
|
||||
|
@ -101,11 +96,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
_connectTimeout = milliseconds;
|
||||
}
|
||||
|
||||
|
||||
@ManagedAttribute("The priority delta to apply to selector threads")
|
||||
@Deprecated
|
||||
public int getSelectorPriorityDelta()
|
||||
{
|
||||
return _priorityDelta;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -119,22 +114,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
* delta to (may be negative)
|
||||
* @see Thread#getPriority()
|
||||
*/
|
||||
@Deprecated
|
||||
public void setSelectorPriorityDelta(int selectorPriorityDelta)
|
||||
{
|
||||
int oldDelta = _priorityDelta;
|
||||
_priorityDelta = selectorPriorityDelta;
|
||||
if (oldDelta != selectorPriorityDelta && isStarted())
|
||||
{
|
||||
for (ManagedSelector selector : _selectors)
|
||||
{
|
||||
Thread thread = selector._thread;
|
||||
if (thread != null)
|
||||
{
|
||||
int deltaDiff = selectorPriorityDelta - oldDelta;
|
||||
thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -242,7 +224,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
ManagedSelector selector = newSelector(i);
|
||||
_selectors[i] = selector;
|
||||
selector.start();
|
||||
execute(new NonBlockingThread(selector));
|
||||
execute(selector);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -393,6 +375,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
/**
|
||||
* Callback method invoked when a read or write events has been
|
||||
* detected by the {@link ManagedSelector} for this endpoint.
|
||||
* @return a job that may block or null
|
||||
*/
|
||||
Runnable onSelected();
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ public class SelectChannelEndPointInterestsTest
|
|||
@Override
|
||||
public Connection newConnection(SocketChannel channel, final EndPoint endPoint, Object attachment)
|
||||
{
|
||||
return new AbstractConnection(endPoint, getExecutor(), true)
|
||||
return new AbstractConnection(endPoint, getExecutor(), false)
|
||||
{
|
||||
@Override
|
||||
public void onOpen()
|
||||
|
|
|
@ -124,7 +124,7 @@ public class SelectChannelEndPointTest
|
|||
|
||||
public TestConnection(EndPoint endp)
|
||||
{
|
||||
super(endp, _threadPool, true);
|
||||
super(endp, _threadPool, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -94,7 +94,7 @@ public class SelectorManagerTest
|
|||
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
|
||||
{
|
||||
((Callback)attachment).succeeded();
|
||||
return new AbstractConnection(endpoint, executor, true)
|
||||
return new AbstractConnection(endpoint, executor, false)
|
||||
{
|
||||
@Override
|
||||
public void onFillable()
|
||||
|
|
|
@ -169,7 +169,7 @@ public class SslConnectionTest
|
|||
|
||||
public TestConnection(EndPoint endp)
|
||||
{
|
||||
super(endp, _threadPool,true);
|
||||
super(endp, _threadPool,false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,2 +1,5 @@
|
|||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
org.eclipse.jetty.LEVEL=INFO
|
||||
org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
|
||||
org.eclipse.jetty.io.ManagedSelector.LEVEL=DEBUG
|
||||
org.eclipse.jetty.io.ssl.SslConnection.LEVEL=DEBUG
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.eclipse.jetty.util.annotation.Name;
|
|||
public class HttpConnectionFactory extends AbstractConnectionFactory implements HttpConfiguration.ConnectionFactory
|
||||
{
|
||||
private final HttpConfiguration _config;
|
||||
private boolean _dispatchIO = true;
|
||||
|
||||
public HttpConnectionFactory()
|
||||
{
|
||||
|
@ -51,19 +50,20 @@ public class HttpConnectionFactory extends AbstractConnectionFactory implements
|
|||
return _config;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public boolean isDispatchIO()
|
||||
{
|
||||
return _dispatchIO;
|
||||
return false;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setDispatchIO(boolean dispatchIO)
|
||||
{
|
||||
_dispatchIO = dispatchIO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection newConnection(Connector connector, EndPoint endPoint)
|
||||
{
|
||||
return configure(new HttpConnection(_config, connector, endPoint, isDispatchIO()), connector, endPoint);
|
||||
return configure(new HttpConnection(_config, connector, endPoint, false), connector, endPoint);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ public class ExtendedServerTest extends HttpServerTestBase
|
|||
{
|
||||
public ExtendedHttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
|
||||
{
|
||||
super(config,connector,endPoint,true);
|
||||
super(config,connector,endPoint,false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -53,7 +53,7 @@ public class SlowClientWithPipelinedRequestTest
|
|||
@Override
|
||||
public Connection newConnection(Connector connector, EndPoint endPoint)
|
||||
{
|
||||
return configure(new HttpConnection(new HttpConfiguration(),connector,endPoint,true)
|
||||
return configure(new HttpConnection(new HttpConfiguration(),connector,endPoint,false)
|
||||
{
|
||||
@Override
|
||||
public void onFillable()
|
||||
|
|
|
@ -68,7 +68,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
|
|||
|
||||
public ProxyHTTPSPDYConnection(Connector connector, HttpConfiguration config, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
|
||||
{
|
||||
super(config, connector, endPoint, true);
|
||||
super(config, connector, endPoint, false);
|
||||
this.version = version;
|
||||
this.proxyEngineSelector = proxyEngineSelector;
|
||||
this.session = new HTTPSession(version, connector);
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
/**
|
||||
* Marker that wraps a Runnable, indicating that it is running in a thread that must not be blocked.
|
||||
* <p />
|
||||
* Client code can use the thread-local {@link #isNonBlockingThread()} to detect whether they are
|
||||
* in the context of a non-blocking thread, and perform different actions if that's the case.
|
||||
*/
|
||||
public class NonBlockingThread implements Runnable
|
||||
{
|
||||
private final static ThreadLocal<Boolean> __nonBlockingThread = new ThreadLocal<>();
|
||||
|
||||
/**
|
||||
* @return whether the current thread is a thread that must not block.
|
||||
*/
|
||||
public static boolean isNonBlockingThread()
|
||||
{
|
||||
return Boolean.TRUE.equals(__nonBlockingThread.get());
|
||||
}
|
||||
|
||||
private final Runnable delegate;
|
||||
|
||||
public NonBlockingThread(Runnable delegate)
|
||||
{
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
__nonBlockingThread.set(Boolean.TRUE);
|
||||
delegate.run();
|
||||
}
|
||||
finally
|
||||
{
|
||||
__nonBlockingThread.remove();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue