From 33fa7afb4715867b12fdf699becc68b319ad3830 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 30 Aug 2011 14:24:28 +1000 Subject: [PATCH] 356144 added SelectorManager.setSelectorPriorityDelta(int) --- .../eclipse/jetty/client/SelectConnector.java | 24 +--- .../eclipse/jetty/io/nio/SelectorManager.java | 132 +++++++++++++----- .../jetty/server/handler/ConnectHandler.java | 17 --- .../server/nio/SelectChannelConnector.java | 46 +----- .../util/component/AbstractLifeCycle.java | 5 + .../util/component/AggregateLifeCycle.java | 8 +- .../jetty/util/thread/QueuedThreadPool.java | 11 +- .../websocket/WebSocketClientFactory.java | 53 ++++--- 8 files changed, 143 insertions(+), 153 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java index 89a3884ac37..333c4166ba4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java @@ -41,7 +41,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Timeout; -class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, Runnable +class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector { private static final Logger LOG = Log.getLogger(SelectConnector.class); @@ -65,7 +65,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, { super.doStart(); - _selectorManager.start(); final boolean direct=_httpClient.getUseDirectBuffers(); @@ -76,7 +75,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(), direct?Type.DIRECT:Type.INDIRECT,1024); - _httpClient._threadPool.dispatch(this); + _selectorManager.start(); } /* ------------------------------------------------------------ */ @@ -117,25 +116,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, { destination.onConnectionFailed(ex); } - - } - - /* ------------------------------------------------------------ */ - public void run() - { - while (_httpClient.isRunning()) - { - try - { - _selectorManager.doSelect(0); - } - catch (Exception e) - { - LOG.warn(e.toString()); - LOG.debug(e); - Thread.yield(); - } - } } /* ------------------------------------------------------------ */ diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java index 91656afbb06..268e7b422eb 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java @@ -54,7 +54,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task; */ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { - public static final Logger __log=Log.getLogger("org.eclipse.jetty.io.nio"); + public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio"); // TODO Tune these by approx system speed. private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue(); @@ -71,6 +71,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private int _selectSets=1; private volatile int _set; private boolean _deferringInterestedOps0=true; + private int _selectorPriorityDelta=0; /* ------------------------------------------------------------ */ /** @@ -178,6 +179,25 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa set.wakeup(); } + /* ------------------------------------------------------------ */ + /** + * @return delta The value to add to the selector thread priority. + */ + public int getSelectorPriorityDelta() + { + return _selectorPriorityDelta; + } + + /* ------------------------------------------------------------ */ + /** Set the selector thread priorty delta. + * @param delta The value to add to the selector thread priority. + */ + public void setSelectorPriorityDelta(int delta) + { + _selectorPriorityDelta=delta; + } + + /* ------------------------------------------------------------ */ /** * @return the lowResourcesConnections @@ -218,17 +238,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; } - /* ------------------------------------------------------------ */ - /** - * @param acceptorID - * @throws IOException - */ - public void doSelect(int acceptorID) throws IOException - { - SelectSet[] sets= _selectSet; - if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null) - sets[acceptorID].doSelect(); - } /* ------------------------------------------------------------------------------- */ public abstract boolean dispatch(Runnable task); @@ -245,6 +254,53 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa _selectSet[i]= new SelectSet(i); super.doStart(); + + // start a thread to Select + for (int i=0;i_log) { if (_paused>0) - __log.debug(this+" Busy selector - injecting delay "+_paused+" times"); + LOG.debug(this+" Busy selector - injecting delay "+_paused+" times"); if (_jvmFix2>0) - __log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times"); + LOG.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times"); if (_jvmFix1>0) - __log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times"); + LOG.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times"); - else if(__log.isDebugEnabled() && _jvmFix0>0) - __log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times"); + else if(LOG.isDebugEnabled() && _jvmFix0>0) + LOG.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times"); _paused=0; _jvmFix2=0; _jvmFix1=0; @@ -720,7 +776,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } catch(InterruptedException e) { - __log.ignore(e); + LOG.ignore(e); } } else if (_jvmBug==__JVMBUG_THRESHHOLD) @@ -754,7 +810,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel)) { final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment(); - __log.warn("Busy Key "+busy.channel()+" "+endpoint); + LOG.warn("Busy Key "+busy.channel()+" "+endpoint); busy.cancel(); if (endpoint!=null) { @@ -768,7 +824,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } catch (IOException e) { - __log.ignore(e); + LOG.ignore(e); } } }); @@ -907,7 +963,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } catch(Exception e) { - __log.ignore(e); + LOG.ignore(e); } // close endpoints and selector @@ -927,7 +983,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } catch(IOException e) { - __log.ignore(e); + LOG.ignore(e); } } } @@ -941,7 +997,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } catch (IOException e) { - __log.ignore(e); + LOG.ignore(e); } _selector=null; } @@ -992,7 +1048,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } catch(InterruptedException e) { - __log.ignore(e); + LOG.ignore(e); } AggregateLifeCycle.dump(out,indent,dump); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index 6f71abb9af8..47c086b0170 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -153,23 +153,6 @@ public class ConnectHandler extends HandlerWrapper ((LifeCycle)_threadPool).start(); _selectorManager.start(); - _threadPool.dispatch(new Runnable() - { - public void run() - { - while (isRunning()) - { - try - { - _selectorManager.doSelect(0); - } - catch (IOException x) - { - _logger.warn("Unexpected exception", x); - } - } - } - }); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index 70fb8d06ad0..c8cff99341c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -36,6 +36,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.Timeout.Task; /* ------------------------------------------------------------------------------- */ @@ -241,46 +242,6 @@ public class SelectChannelConnector extends AbstractNIOConnector _manager.start(); super.doStart(); - - // start a thread to Select - for (int i=0;i _dependentBeans=new ConcurrentLinkedQueue(); + private final List _dependentBeans=new CopyOnWriteArrayList(); public void destroy() { @@ -49,7 +51,9 @@ public class AggregateLifeCycle extends AbstractLifeCycle implements Destroyable protected void doStop() throws Exception { super.doStop(); - for (Object o:_dependentBeans) + List reverse = new ArrayList(_dependentBeans); + Collections.reverse(reverse); + for (Object o:reverse) { if (o instanceof LifeCycle) ((LifeCycle)o).stop(); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 466082df6de..5c54999a032 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -139,14 +139,14 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { LOG.warn(size+" threads could not be stopped"); - if (LOG.isDebugEnabled()) + if (size==1 || LOG.isDebugEnabled()) { for (Thread unstopped : _threads) { - LOG.debug("Couldn't stop "+unstopped); + LOG.info("Couldn't stop "+unstopped); for (StackTraceElement element : unstopped.getStackTrace()) { - LOG.debug(" at "+element); + LOG.info(" at "+element); } } } @@ -507,9 +507,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public String toString() { - return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; + return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}#"+getState(); } - + + /* ------------------------------------------------------------ */ private Runnable idleJobPoll() throws InterruptedException { return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketClientFactory.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketClientFactory.java index 45d6d1d0ffd..fbd8117ba85 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketClientFactory.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketClientFactory.java @@ -31,7 +31,8 @@ import org.eclipse.jetty.util.thread.ThreadPool; *

WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances * (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).

*

WebSocketClients with different configurations should share the same factory to avoid to waste resources.

- * + *

If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)}, + * so it's lifecycle must be controlled externally. * @see WebSocketClient */ public class WebSocketClientFactory extends AggregateLifeCycle @@ -51,7 +52,14 @@ public class WebSocketClientFactory extends AggregateLifeCycle */ public WebSocketClientFactory() { - this(new QueuedThreadPool()); + _threadPool=new QueuedThreadPool(); + addBean(_threadPool); + _buffers=new WebSocketBuffers(8*1024); + addBean(_buffers); + _maskGen=new RandomMaskGen(); + addBean(_maskGen); + _selector=new WebSocketClientSelector(); + addBean(_selector); } /* ------------------------------------------------------------ */ @@ -61,7 +69,14 @@ public class WebSocketClientFactory extends AggregateLifeCycle */ public WebSocketClientFactory(ThreadPool threadPool) { - this(threadPool,new RandomMaskGen(),16*1024); + _threadPool=threadPool; + addBean(threadPool); + _buffers=new WebSocketBuffers(8*1024); + addBean(_buffers); + _maskGen=new RandomMaskGen(); + addBean(_maskGen); + _selector=new WebSocketClientSelector(); + addBean(_selector); } /* ------------------------------------------------------------ */ @@ -74,11 +89,12 @@ public class WebSocketClientFactory extends AggregateLifeCycle public WebSocketClientFactory(ThreadPool threadPool,MaskGen maskGen,int bufferSize) { _threadPool=threadPool; - _selector=new WebSocketClientSelector(); + addBean(threadPool); _buffers=new WebSocketBuffers(bufferSize); + addBean(_buffers); _maskGen=maskGen; + _selector=new WebSocketClientSelector(); addBean(_selector); - addBean(_threadPool); } /* ------------------------------------------------------------ */ @@ -120,6 +136,8 @@ public class WebSocketClientFactory extends AggregateLifeCycle { if (isRunning()) throw new IllegalStateException(getState()); + if (removeBean(_maskGen)) + addBean(maskGen); _maskGen=maskGen; } @@ -132,7 +150,9 @@ public class WebSocketClientFactory extends AggregateLifeCycle { if (isRunning()) throw new IllegalStateException(getState()); + removeBean(_buffers); _buffers=new WebSocketBuffers(bufferSize); + addBean(_buffers); } /* ------------------------------------------------------------ */ @@ -161,29 +181,6 @@ public class WebSocketClientFactory extends AggregateLifeCycle protected void doStart() throws Exception { super.doStart(); - - // Start a selector threads - for (int i=0;i<_selector.getSelectSets();i++) - { - final int id=i; - _threadPool.dispatch(new Runnable() - { - public void run() - { - while(isRunning()) - { - try - { - _selector.doSelect(id); - } - catch (IOException e) - { - __log.warn(e); - } - } - } - }); - } } /* ------------------------------------------------------------ */