356144 added SelectorManager.setSelectorPriorityDelta(int)

This commit is contained in:
Greg Wilkins 2011-08-30 14:24:28 +10:00
parent 97ad4afb19
commit 33fa7afb47
8 changed files with 143 additions and 153 deletions

View File

@ -41,7 +41,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Timeout; import org.eclipse.jetty.util.thread.Timeout;
class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
{ {
private static final Logger LOG = Log.getLogger(SelectConnector.class); private static final Logger LOG = Log.getLogger(SelectConnector.class);
@ -65,7 +65,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
{ {
super.doStart(); super.doStart();
_selectorManager.start();
final boolean direct=_httpClient.getUseDirectBuffers(); final boolean direct=_httpClient.getUseDirectBuffers();
@ -76,7 +75,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(), direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
direct?Type.DIRECT:Type.INDIRECT,1024); direct?Type.DIRECT:Type.INDIRECT,1024);
_httpClient._threadPool.dispatch(this); _selectorManager.start();
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -117,25 +116,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
{ {
destination.onConnectionFailed(ex); destination.onConnectionFailed(ex);
} }
}
/* ------------------------------------------------------------ */
public void run()
{
while (_httpClient.isRunning())
{
try
{
_selectorManager.doSelect(0);
}
catch (Exception e)
{
LOG.warn(e.toString());
LOG.debug(e);
Thread.yield();
}
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -54,7 +54,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
*/ */
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
{ {
public static final Logger __log=Log.getLogger("org.eclipse.jetty.io.nio"); public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
// TODO Tune these by approx system speed. // TODO Tune these by approx system speed.
private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue(); private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
@ -71,6 +71,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private int _selectSets=1; private int _selectSets=1;
private volatile int _set; private volatile int _set;
private boolean _deferringInterestedOps0=true; private boolean _deferringInterestedOps0=true;
private int _selectorPriorityDelta=0;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -178,6 +179,25 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
set.wakeup(); set.wakeup();
} }
/* ------------------------------------------------------------ */
/**
* @return delta The value to add to the selector thread priority.
*/
public int getSelectorPriorityDelta()
{
return _selectorPriorityDelta;
}
/* ------------------------------------------------------------ */
/** Set the selector thread priorty delta.
* @param delta The value to add to the selector thread priority.
*/
public void setSelectorPriorityDelta(int delta)
{
_selectorPriorityDelta=delta;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return the lowResourcesConnections * @return the lowResourcesConnections
@ -218,17 +238,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
_lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
} }
/* ------------------------------------------------------------ */
/**
* @param acceptorID
* @throws IOException
*/
public void doSelect(int acceptorID) throws IOException
{
SelectSet[] sets= _selectSet;
if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
sets[acceptorID].doSelect();
}
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
public abstract boolean dispatch(Runnable task); public abstract boolean dispatch(Runnable task);
@ -245,6 +254,53 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
_selectSet[i]= new SelectSet(i); _selectSet[i]= new SelectSet(i);
super.doStart(); super.doStart();
// start a thread to Select
for (int i=0;i<getSelectSets();i++)
{
final int id=i;
dispatch(new Runnable()
{
public void run()
{
SelectSet set=_selectSet[id];
String name=Thread.currentThread().getName();
int priority=Thread.currentThread().getPriority();
try
{
Thread.currentThread().setName(name+" Selector"+id);
if (getSelectorPriorityDelta()!=0)
Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
while (isRunning())
{
try
{
set.doSelect();
}
catch(ThreadDeath e)
{
throw e;
}
catch(IOException e)
{
LOG.ignore(e);
}
catch(Exception e)
{
LOG.warn(e);
}
}
}
finally
{
Thread.currentThread().setName(name);
if (getSelectorPriorityDelta()!=0)
Thread.currentThread().setPriority(priority);
}
}
});
}
} }
@ -297,8 +353,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
{ {
__log.warn(ex+","+channel+","+attachment); LOG.warn(ex+","+channel+","+attachment);
__log.debug(ex); LOG.debug(ex);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -449,7 +505,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch (CancelledKeyException e) catch (CancelledKeyException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
catch (Throwable e) catch (Throwable e)
{ {
@ -457,9 +513,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
throw (ThreadDeath)e; throw (ThreadDeath)e;
if (isRunning()) if (isRunning())
__log.warn(e); LOG.warn(e);
else else
__log.debug(e); LOG.debug(e);
try try
{ {
@ -467,7 +523,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch(IOException e2) catch(IOException e2)
{ {
__log.debug(e2); LOG.debug(e2);
} }
} }
} }
@ -491,7 +547,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch(InterruptedException e) catch(InterruptedException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
now=System.currentTimeMillis(); now=System.currentTimeMillis();
} }
@ -585,14 +641,14 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch (CancelledKeyException e) catch (CancelledKeyException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
catch (Exception e) catch (Exception e)
{ {
if (isRunning()) if (isRunning())
__log.warn(e); LOG.warn(e);
else else
__log.ignore(e); LOG.ignore(e);
try try
{ {
@ -601,7 +657,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch(IOException e2) catch(IOException e2)
{ {
__log.debug(e2); LOG.debug(e2);
} }
if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
@ -646,13 +702,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
catch (ClosedSelectorException e) catch (ClosedSelectorException e)
{ {
if (isRunning()) if (isRunning())
__log.warn(e); LOG.warn(e);
else else
__log.ignore(e); LOG.ignore(e);
} }
catch (CancelledKeyException e) catch (CancelledKeyException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
finally finally
{ {
@ -687,16 +743,16 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (now>_log) if (now>_log)
{ {
if (_paused>0) if (_paused>0)
__log.debug(this+" Busy selector - injecting delay "+_paused+" times"); LOG.debug(this+" Busy selector - injecting delay "+_paused+" times");
if (_jvmFix2>0) if (_jvmFix2>0)
__log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times"); LOG.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
if (_jvmFix1>0) if (_jvmFix1>0)
__log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times"); LOG.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
else if(__log.isDebugEnabled() && _jvmFix0>0) else if(LOG.isDebugEnabled() && _jvmFix0>0)
__log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times"); LOG.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
_paused=0; _paused=0;
_jvmFix2=0; _jvmFix2=0;
_jvmFix1=0; _jvmFix1=0;
@ -720,7 +776,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch(InterruptedException e) catch(InterruptedException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
} }
else if (_jvmBug==__JVMBUG_THRESHHOLD) else if (_jvmBug==__JVMBUG_THRESHHOLD)
@ -754,7 +810,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel)) if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
{ {
final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment(); final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
__log.warn("Busy Key "+busy.channel()+" "+endpoint); LOG.warn("Busy Key "+busy.channel()+" "+endpoint);
busy.cancel(); busy.cancel();
if (endpoint!=null) if (endpoint!=null)
{ {
@ -768,7 +824,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch (IOException e) catch (IOException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
} }
}); });
@ -907,7 +963,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch(Exception e) catch(Exception e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
// close endpoints and selector // close endpoints and selector
@ -927,7 +983,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch(IOException e) catch(IOException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
} }
} }
@ -941,7 +997,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch (IOException e) catch (IOException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
_selector=null; _selector=null;
} }
@ -992,7 +1048,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
} }
catch(InterruptedException e) catch(InterruptedException e)
{ {
__log.ignore(e); LOG.ignore(e);
} }
AggregateLifeCycle.dump(out,indent,dump); AggregateLifeCycle.dump(out,indent,dump);
} }

View File

@ -153,23 +153,6 @@ public class ConnectHandler extends HandlerWrapper
((LifeCycle)_threadPool).start(); ((LifeCycle)_threadPool).start();
_selectorManager.start(); _selectorManager.start();
_threadPool.dispatch(new Runnable()
{
public void run()
{
while (isRunning())
{
try
{
_selectorManager.doSelect(0);
}
catch (IOException x)
{
_logger.warn("Unexpected exception", x);
}
}
}
});
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.Timeout.Task; import org.eclipse.jetty.util.thread.Timeout.Task;
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
@ -241,46 +242,6 @@ public class SelectChannelConnector extends AbstractNIOConnector
_manager.start(); _manager.start();
super.doStart(); super.doStart();
// start a thread to Select
for (int i=0;i<getAcceptors();i++)
{
final int id=i;
_manager.dispatch(new Runnable()
{
public void run()
{
String name=Thread.currentThread().getName();
try
{
Thread.currentThread().setName(name+" Selector"+id+" "+SelectChannelConnector.this);
while (isRunning())
{
try
{
_manager.doSelect(id);
}
catch(ThreadDeath e)
{
throw e;
}
catch(IOException e)
{
LOG.ignore(e);
}
catch(Exception e)
{
LOG.warn(e);
}
}
}
finally
{
Thread.currentThread().setName(name);
}
}
});
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -372,7 +333,10 @@ public class SelectChannelConnector extends AbstractNIOConnector
@Override @Override
public boolean dispatch(Runnable task) public boolean dispatch(Runnable task)
{ {
return getThreadPool().dispatch(task); ThreadPool pool=getThreadPool();
if (pool==null)
pool=getServer().getThreadPool();
return pool.dispatch(task);
} }
@Override @Override

View File

@ -209,4 +209,9 @@ public abstract class AbstractLifeCycle implements LifeCycle
public void lifeCycleStopped(LifeCycle event) {} public void lifeCycleStopped(LifeCycle event) {}
public void lifeCycleStopping(LifeCycle event) {} public void lifeCycleStopping(LifeCycle event) {}
} }
public String toString()
{
return super.toString()+"#"+getState();
}
} }

View File

@ -3,10 +3,12 @@ package org.eclipse.jetty.util.component;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -20,7 +22,7 @@ import org.eclipse.jetty.util.log.Logger;
public class AggregateLifeCycle extends AbstractLifeCycle implements Destroyable, Dumpable public class AggregateLifeCycle extends AbstractLifeCycle implements Destroyable, Dumpable
{ {
private static final Logger LOG = Log.getLogger(AggregateLifeCycle.class); private static final Logger LOG = Log.getLogger(AggregateLifeCycle.class);
private final Queue<Object> _dependentBeans=new ConcurrentLinkedQueue<Object>(); private final List<Object> _dependentBeans=new CopyOnWriteArrayList<Object>();
public void destroy() public void destroy()
{ {
@ -49,7 +51,9 @@ public class AggregateLifeCycle extends AbstractLifeCycle implements Destroyable
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
super.doStop(); super.doStop();
for (Object o:_dependentBeans) List<Object> reverse = new ArrayList<Object>(_dependentBeans);
Collections.reverse(reverse);
for (Object o:reverse)
{ {
if (o instanceof LifeCycle) if (o instanceof LifeCycle)
((LifeCycle)o).stop(); ((LifeCycle)o).stop();

View File

@ -139,14 +139,14 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{ {
LOG.warn(size+" threads could not be stopped"); LOG.warn(size+" threads could not be stopped");
if (LOG.isDebugEnabled()) if (size==1 || LOG.isDebugEnabled())
{ {
for (Thread unstopped : _threads) for (Thread unstopped : _threads)
{ {
LOG.debug("Couldn't stop "+unstopped); LOG.info("Couldn't stop "+unstopped);
for (StackTraceElement element : unstopped.getStackTrace()) for (StackTraceElement element : unstopped.getStackTrace())
{ {
LOG.debug(" at "+element); LOG.info(" at "+element);
} }
} }
} }
@ -507,9 +507,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override @Override
public String toString() public String toString()
{ {
return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}#"+getState();
} }
/* ------------------------------------------------------------ */
private Runnable idleJobPoll() throws InterruptedException private Runnable idleJobPoll() throws InterruptedException
{ {
return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);

View File

@ -31,7 +31,8 @@ import org.eclipse.jetty.util.thread.ThreadPool;
* <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances * <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
* (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p> * (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
* <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p> * <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
* * <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
* so it's lifecycle must be controlled externally.
* @see WebSocketClient * @see WebSocketClient
*/ */
public class WebSocketClientFactory extends AggregateLifeCycle public class WebSocketClientFactory extends AggregateLifeCycle
@ -51,7 +52,14 @@ public class WebSocketClientFactory extends AggregateLifeCycle
*/ */
public WebSocketClientFactory() public WebSocketClientFactory()
{ {
this(new QueuedThreadPool()); _threadPool=new QueuedThreadPool();
addBean(_threadPool);
_buffers=new WebSocketBuffers(8*1024);
addBean(_buffers);
_maskGen=new RandomMaskGen();
addBean(_maskGen);
_selector=new WebSocketClientSelector();
addBean(_selector);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -61,7 +69,14 @@ public class WebSocketClientFactory extends AggregateLifeCycle
*/ */
public WebSocketClientFactory(ThreadPool threadPool) public WebSocketClientFactory(ThreadPool threadPool)
{ {
this(threadPool,new RandomMaskGen(),16*1024); _threadPool=threadPool;
addBean(threadPool);
_buffers=new WebSocketBuffers(8*1024);
addBean(_buffers);
_maskGen=new RandomMaskGen();
addBean(_maskGen);
_selector=new WebSocketClientSelector();
addBean(_selector);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -74,11 +89,12 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public WebSocketClientFactory(ThreadPool threadPool,MaskGen maskGen,int bufferSize) public WebSocketClientFactory(ThreadPool threadPool,MaskGen maskGen,int bufferSize)
{ {
_threadPool=threadPool; _threadPool=threadPool;
_selector=new WebSocketClientSelector(); addBean(threadPool);
_buffers=new WebSocketBuffers(bufferSize); _buffers=new WebSocketBuffers(bufferSize);
addBean(_buffers);
_maskGen=maskGen; _maskGen=maskGen;
_selector=new WebSocketClientSelector();
addBean(_selector); addBean(_selector);
addBean(_threadPool);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -120,6 +136,8 @@ public class WebSocketClientFactory extends AggregateLifeCycle
{ {
if (isRunning()) if (isRunning())
throw new IllegalStateException(getState()); throw new IllegalStateException(getState());
if (removeBean(_maskGen))
addBean(maskGen);
_maskGen=maskGen; _maskGen=maskGen;
} }
@ -132,7 +150,9 @@ public class WebSocketClientFactory extends AggregateLifeCycle
{ {
if (isRunning()) if (isRunning())
throw new IllegalStateException(getState()); throw new IllegalStateException(getState());
removeBean(_buffers);
_buffers=new WebSocketBuffers(bufferSize); _buffers=new WebSocketBuffers(bufferSize);
addBean(_buffers);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -161,29 +181,6 @@ public class WebSocketClientFactory extends AggregateLifeCycle
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
super.doStart(); super.doStart();
// Start a selector threads
for (int i=0;i<_selector.getSelectSets();i++)
{
final int id=i;
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
{
try
{
_selector.doSelect(id);
}
catch (IOException e)
{
__log.warn(e);
}
}
}
});
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */