diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 3365a5dd207..c0f892158ac 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -15,25 +15,21 @@ package org.eclipse.jetty.io; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Timeout.Task; /* ------------------------------------------------------------ */ /** * An Endpoint that can be scheduled by {@link SelectorManager}. */ -public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableAsyncEndPoint +public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, SelectorManager.SelectableAsyncEndPoint { public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); @@ -48,7 +44,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa /** The desired value for {@link SelectionKey#interestOps()} */ private int _interestOps; - /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ + /** true if {@link SelectSet#destroyEndPoint(SelectorManager.SelectableAsyncEndPoint)} has not been called */ private boolean _open; private volatile boolean _idlecheck; @@ -64,7 +60,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa return false; } }; - + private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @Override @@ -123,14 +119,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa /* ------------------------------------------------------------ */ /** * Called by selectSet to schedule handling - * + * */ @Override public void onSelected() { boolean can_read; boolean can_write; - + synchronized (this) { _selected = true; @@ -146,7 +142,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa } finally { - doUpdateKey(); + _selectSet.submit(this); _selected = false; } } @@ -156,18 +152,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa _writeFlusher.completeWrite(); } - /* ------------------------------------------------------------ */ - public void cancelTimeout(Task task) - { - getSelectSet().cancelTimeout(task); - } - - /* ------------------------------------------------------------ */ - public void scheduleTimeout(Task task, long timeoutMs) - { - getSelectSet().scheduleTimeout(task,timeoutMs); - } - /* ------------------------------------------------------------ */ @Override public void setCheckForIdle(boolean check) @@ -200,10 +184,10 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa if (idleForMs > max_idle_time) { notIdle(); - + if (_idlecheck) _connection.onIdleExpired(idleForMs); - + TimeoutException timeout = new TimeoutException(); _readInterest.failed(timeout); _writeFlusher.failed(timeout); @@ -212,7 +196,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa } } } - + /* ------------------------------------------------------------ */ @Override @@ -225,12 +209,12 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa } /* ------------------------------------------------------------ */ - @Override + @Override public void readable(C context, Callback callback) throws IllegalStateException { _readInterest.readable(context,callback); } - + /* ------------------------------------------------------------ */ @Override public void write(C context, Callback callback, ByteBuffer... buffers) throws IllegalStateException @@ -274,13 +258,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa if (_interestOps != current_ops && !_changing) { _changing = true; - _selectSet.addChange(this); - _selectSet.wakeup(); + _selectSet.submit(this); } } } } + @Override + public void run() + { + doUpdateKey(); + } + /* ------------------------------------------------------------ */ /** * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey @@ -364,15 +353,15 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa super.close(); updateKey(); } - + /* ------------------------------------------------------------ */ - @Override + @Override public void onClose() { _writeFlusher.close(); _readInterest.close(); } - + /* ------------------------------------------------------------ */ @Override public String toString() diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index c4ffef36194..81e9c024e50 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -14,13 +14,13 @@ package org.eclipse.jetty.io; import java.io.IOException; +import java.net.ConnectException; import java.nio.channels.CancelledKeyException; import java.nio.channels.Channel; +import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; -import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; @@ -29,19 +29,18 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.Name; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Timeout; -import org.eclipse.jetty.util.thread.Timeout.Task; -/* ------------------------------------------------------------ */ /** * The Selector Manager manages and number of SelectSets to allow * NIO scheduling to scale to large numbers of connections. @@ -49,44 +48,32 @@ import org.eclipse.jetty.util.thread.Timeout.Task; */ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { - public static final Logger LOG=Log.getLogger(SelectorManager.class); - - private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); - private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); - private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); - private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); + public static final Logger LOG = Log.getLogger(SelectorManager.class); + private final Executor _executor; + private final SelectSet[] _selectSets; private int _maxIdleTime; - private int _lowResourcesMaxIdleTime; - private long _lowResourcesConnections; - private SelectSet[] _selectSet; - private int _selectSets=1; - private volatile int _set=0; - private boolean _deferringInterestedOps0=true; - private int _selectorPriorityDelta=0; + private long _selectSetIndex; + + protected SelectorManager(Executor executor) + { + this(executor, 1); + } + + protected SelectorManager(@Name("executor") Executor executor, @Name("selectSets") int selectSets) + { + this._executor = executor; + this._selectSets = new SelectSet[selectSets]; + } - /* ------------------------------------------------------------ */ /** - * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. - * @see #setLowResourcesMaxIdleTime(long) + * @param maxIdleTime The maximum period in milliseconds that a connection may be idle before it is closed. */ public void setMaxIdleTime(long maxIdleTime) { _maxIdleTime=(int)maxIdleTime; } - /* ------------------------------------------------------------ */ - /** - * @param selectSets number of select sets to create - */ - public void setSelectSets(int selectSets) - { - long lrc = _lowResourcesConnections * _selectSets; - _selectSets=selectSets; - _lowResourcesConnections=lrc/_selectSets; - } - - /* ------------------------------------------------------------ */ /** * @return the max idle time */ @@ -95,830 +82,407 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa return _maxIdleTime; } - /* ------------------------------------------------------------ */ /** * @return the number of select sets in use */ public int getSelectSets() { - return _selectSets; + return _selectSets.length; } - /* ------------------------------------------------------------ */ - /** - * @param i - * @return The select set - */ - public SelectSet getSelectSet(int i) - { - return _selectSet[i]; - } - - /* ------------------------------------------------------------ */ - /** Register a channel - * @param channel - * @param att Attached Object - */ - public void register(SocketChannel channel, Object att) + private SelectSet chooseSelectSet() { // The ++ increment here is not atomic, but it does not matter. // so long as the value changes sometimes, then connections will // be distributed over the available sets. - - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet[] sets=_selectSet; - if (sets!=null) - { - SelectSet set=sets[s]; - set.addChange(channel,att); - set.wakeup(); - } + long s = _selectSetIndex++; + int index = (int)(s % getSelectSets()); + return _selectSets[index]; } - - /* ------------------------------------------------------------ */ - /** Register a channel - * @param channel - */ - public void register(SocketChannel channel) - { - // The ++ increment here is not atomic, but it does not matter. - // so long as the value changes sometimes, then connections will - // be distributed over the available sets. - - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet[] sets=_selectSet; - if (sets!=null) - { - SelectSet set=sets[s]; - set.addChange(channel); - set.wakeup(); - } - } - - /* ------------------------------------------------------------ */ - /** Register a {@link ServerSocketChannel} - * @param acceptChannel - */ - public void register(ServerSocketChannel acceptChannel) - { - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet set=_selectSet[s]; - set.addChange(acceptChannel); - set.wakeup(); - } - - /* ------------------------------------------------------------ */ /** - * @return delta The value to add to the selector thread priority. + * Registers a channel + * @param channel the channel to register + * @param attachment Attached Object */ - public int getSelectorPriorityDelta() + public void connect(SocketChannel channel, Object attachment) { - return _selectorPriorityDelta; + SelectSet set = chooseSelectSet(); + set.submit(set.new Connect(channel, attachment)); } - /* ------------------------------------------------------------ */ - /** Set the selector thread priorty delta. - * @param delta The value to add to the selector thread priority. - */ - public void setSelectorPriorityDelta(int delta) - { - _selectorPriorityDelta=delta; - } - - - /* ------------------------------------------------------------ */ /** - * @return the lowResourcesConnections + * Registers a channel + * @param channel the channel to register */ - public long getLowResourcesConnections() + public void accept(final SocketChannel channel) { - return _lowResourcesConnections*_selectSets; + final SelectSet set = chooseSelectSet(); + set.submit(set.new Accept(channel)); } - /* ------------------------------------------------------------ */ - /** - * Set the number of connections, which if exceeded places this manager in low resources state. - * This is not an exact measure as the connection count is averaged over the select sets. - * @param lowResourcesConnections the number of connections - * @see #setLowResourcesMaxIdleTime(long) - */ - public void setLowResourcesConnections(long lowResourcesConnections) + private void execute(Runnable task) { - _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; + _executor.execute(task); } - /* ------------------------------------------------------------ */ - /** - * @return the lowResourcesMaxIdleTime - */ - public long getLowResourcesMaxIdleTime() - { - return _lowResourcesMaxIdleTime; - } - - /* ------------------------------------------------------------ */ - /** - * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} - * @see #setMaxIdleTime(long) - */ - public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) - { - _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; - } - - - /* ------------------------------------------------------------------------------- */ - public abstract boolean dispatch(Runnable task); - - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see org.eclipse.component.AbstractLifeCycle#doStart() - */ @Override protected void doStart() throws Exception { - _selectSet = new SelectSet[_selectSets]; - for (int i=0;i<_selectSet.length;i++) - _selectSet[i]= new SelectSet(i); - super.doStart(); - - // start a thread to Select - for (int i=0;i _changes = new ConcurrentLinkedQueue(); + private void sleep(long delay) + { + try + { + Thread.sleep(delay); + } + catch (InterruptedException x) + { + LOG.ignore(x); + } + } + } - private volatile Selector _selector; - - private volatile Thread _selecting; - private int _busySelects; - private long _monitorNext; - private boolean _pausing; - private boolean _paused; - private volatile long _idleTick; + public class SelectSet extends AbstractLifeCycle implements Runnable, Dumpable + { + private final ConcurrentLinkedQueue _changes = new ConcurrentLinkedQueue<>(); private ConcurrentMap _endPoints = new ConcurrentHashMap<>(); + private final int _id; + private Selector _selector; + private Thread _thread; + private boolean needsWakeup = true; - /* ------------------------------------------------------------ */ - SelectSet(int acceptorID) throws Exception + protected SelectSet(int id) { - _setID=acceptorID; + _id = id; + } - _idleTick = System.currentTimeMillis(); - _timeout = new Timeout(this); - _timeout.setDuration(0L); - - // create a selector; + @Override + protected void doStart() throws Exception + { + super.doStart(); _selector = Selector.open(); - _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; } - /* ------------------------------------------------------------ */ - public void addChange(Object change) + @Override + protected void doStop() throws Exception { - _changes.add(change); + Stop task = new Stop(); + submit(task); + task.await(getStopTimeout(), TimeUnit.MILLISECONDS); } - /* ------------------------------------------------------------ */ - public void addChange(SelectableChannel channel, Object att) + public boolean submit(Runnable change) { - if (att==null) - addChange(channel); - else if (att instanceof EndPoint) - addChange(att); + if (Thread.currentThread() != _thread) + { + _changes.add(change); + if (LOG.isDebugEnabled()) + LOG.debug("Queued change {}", change); + boolean wakeup = needsWakeup; + if (wakeup) + wakeup(); + return false; + } else - addChange(new ChannelAndAttachment(channel,att)); + { + if (LOG.isDebugEnabled()) + LOG.debug("Submitted change {}", change); + runChanges(); + runChange(change); + return true; + } + } + + private void runChanges() + { + Runnable change; + while ((change = _changes.poll()) != null) + { + runChange(change); + } + } + + protected void runChange(Runnable change) + { + LOG.debug("Running change {}", change); + change.run(); + } + + @Override + public void run() + { + _thread = Thread.currentThread(); + String name = _thread.getName(); + try + { + _thread.setName(name + " Selector" + _id); + LOG.debug("Starting {} on {}", _thread, this); + while (isRunning()) + { + try + { + doSelect(); + } + catch (IOException e) + { + LOG.warn(e); + } + } + processChanges(); + } + finally + { + LOG.debug("Stopped {} on {}", _thread, this); + _thread.setName(name); + } } - /* ------------------------------------------------------------ */ /** - * Select and dispatch tasks found from changes and the selector. + * Select and execute tasks found from changes and the selector. * * @throws IOException */ public void doSelect() throws IOException { + boolean debug = LOG.isDebugEnabled(); try { - _selecting=Thread.currentThread(); - final Selector selector=_selector; - // Stopped concurrently ? - if (selector == null) - return; + processChanges(); - // Make any key changes required - Object change; - int changes=_changes.size(); - while (changes-->0 && (change=_changes.poll())!=null) + if (debug) + LOG.debug("Selector loop waiting on select"); + int selected = _selector.select(); + if (debug) + LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size()); + + needsWakeup = false; + + Set selectedKeys = _selector.selectedKeys(); + for (SelectionKey key: selectedKeys) { - Channel ch=null; - - try - { - if (change instanceof EndPoint) - { - // Update the operations for a key. - SelectableAsyncEndPoint endpoint = (SelectableAsyncEndPoint)change; - ch=endpoint.getChannel(); - endpoint.doUpdateKey(); - } - else if (change instanceof ChannelAndAttachment) - { - // finish accepting/connecting this connection - final ChannelAndAttachment asc = (ChannelAndAttachment)change; - final SelectableChannel channel=asc._channel; - ch=channel; - final Object att = asc._attachment; - - if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) - { - SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att); - AsyncEndPoint endpoint = createEndPoint((SocketChannel)channel,key); - key.attach(endpoint); - } - else if (channel.isOpen()) - { - channel.register(selector,SelectionKey.OP_CONNECT,att); - } - } - else if (change instanceof SocketChannel) - { - // Newly registered channel - final SocketChannel channel=(SocketChannel)change; - ch=channel; - SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null); - AsyncEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - } - else if (change instanceof ChangeTask) - { - LOG.warn("DO WE NEED THIS????"); - ((Runnable)change).run(); - } - else if (change instanceof Runnable) - { - dispatch((Runnable)change); - } - else - throw new IllegalArgumentException(change.toString()); - } - catch (CancelledKeyException e) - { - LOG.ignore(e); - } - catch (Throwable e) - { - if (isRunning()) - LOG.warn(e); - else - LOG.debug(e); - - try - { - if (ch!=null) - ch.close(); - } - catch(IOException e2) - { - LOG.debug(e2); - } - } - } - - - // Do and instant select to see if any connections can be handled. - int selected=selector.selectNow(); - - long now=System.currentTimeMillis(); - - // if no immediate things to do - if (selected==0 && selector.selectedKeys().isEmpty()) - { - // If we are in pausing mode - if (_pausing) - { - try - { - Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop - } - catch(InterruptedException e) - { - LOG.ignore(e); - } - now=System.currentTimeMillis(); - } - - // workout how long to wait in select - _timeout.setNow(now); - long to_next_timeout=_timeout.getTimeToNext(); - - long wait = _changes.size()==0?__IDLE_TICK:0L; - if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) - wait = to_next_timeout; - - // If we should wait with a select - if (wait>0) - { - long before=now; - selector.select(wait); - now = System.currentTimeMillis(); - _timeout.setNow(now); - - // If we are monitoring for busy selector - // and this select did not wait more than 1ms - if (__MONITOR_PERIOD>0 && now-before <=1) - { - // count this as a busy select and if there have been too many this monitor cycle - if (++_busySelects>__MAX_SELECTS) - { - // Start injecting pauses - _pausing=true; - - // if this is the first pause - if (!_paused) - { - // Log and dump some status - _paused=true; - LOG.warn("Selector {} is too busy, pausing!",this); - } - } - } - } - } - - // have we been destroyed while sleeping - if (_selector==null || !selector.isOpen()) - return; - - // Look for things to do - for (SelectionKey key: selector.selectedKeys()) - { - SocketChannel channel=null; - try { if (!key.isValid()) { - key.cancel(); - SelectableAsyncEndPoint endpoint = (SelectableAsyncEndPoint)key.attachment(); - if (endpoint != null) - endpoint.doUpdateKey(); + if (debug) + LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); continue; } - Object att = key.attachment(); - if (att instanceof SelectableAsyncEndPoint) - { - if (key.isReadable()||key.isWritable()) - ((SelectableAsyncEndPoint)att).onSelected(); - } - else if (key.isConnectable()) - { - // Complete a connection of a registered channel - channel = (SocketChannel)key.channel(); - boolean connected=false; - try - { - connected=channel.finishConnect(); - } - catch(Exception e) - { - connectionFailed(channel,e,att); - } - finally - { - if (connected) - { - key.interestOps(SelectionKey.OP_READ); - AsyncEndPoint endpoint = createEndPoint(channel, key); - key.attach(endpoint); - // TODO: remove the cast - ((SelectableAsyncEndPoint)endpoint).onSelected(); - } - else - { - key.cancel(); - } - } - } - else - { - // Wrap readable registered channel in an endpoint - channel = (SocketChannel)key.channel(); - AsyncEndPoint endpoint = createEndPoint(channel, key); - key.attach(endpoint); - if (key.isReadable()) - { - // TODO: remove the cast - ((SelectableAsyncEndPoint)endpoint).onSelected(); - } - } - key = null; + processKey(key); } - catch (CancelledKeyException e) - { - LOG.ignore(e); - } - catch (Exception e) + catch (Exception x) { if (isRunning()) - LOG.warn(e); + LOG.warn(x); else - LOG.ignore(e); + LOG.debug(x); - try - { - if (channel!=null) - channel.close(); - } - catch(IOException e2) - { - LOG.debug(e2); - } - - if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) - key.cancel(); + execute(new Close(key)); } } // Everything always handled - selector.selectedKeys().clear(); - - now=System.currentTimeMillis(); - _timeout.setNow(now); - Task task = _timeout.expired(); - while (task!=null) - { - if (task instanceof Runnable) - dispatch((Runnable)task); - task = _timeout.expired(); - } - - // Idle tick - if (now-_idleTick>__IDLE_TICK) - { - _idleTick=now; - - final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) - ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) - :now; - - dispatch(new Runnable() - { - public void run() - { - for (AsyncEndPoint endp:_endPoints.keySet()) - { - // TODO: remove the cast - ((SelectableAsyncEndPoint)endp).checkForIdleOrReadWriteTimeout(idle_now); - } - } - public String toString() {return "Idle-"+super.toString();} - }); - - } - - // Reset busy select monitor counts - if (__MONITOR_PERIOD>0 && now>_monitorNext) - { - _busySelects=0; - _pausing=false; - _monitorNext=now+__MONITOR_PERIOD; - - } + selectedKeys.clear(); } - catch (ClosedSelectorException e) + catch (ClosedSelectorException x) { if (isRunning()) - LOG.warn(e); + LOG.warn(x); else - LOG.ignore(e); - } - catch (CancelledKeyException e) - { - LOG.ignore(e); - } - finally - { - _selecting=null; + LOG.ignore(x); } } + private void processChanges() + { + runChanges(); - /* ------------------------------------------------------------ */ - private void renewSelector() + // If tasks are submitted between these 2 statements, they will not + // wakeup the selector, therefore below we run again the tasks + + needsWakeup = true; + + // Run again the tasks to avoid the race condition where a task is + // submitted but will not wake up the selector + runChanges(); + } + + private void processKey(SelectionKey key) throws IOException { try { - synchronized (this) + Object att = key.attachment(); + if (att instanceof SelectableAsyncEndPoint) { - Selector selector=_selector; - if (selector==null) - return; - final Selector new_selector = Selector.open(); - for (SelectionKey k: selector.keys()) + if (key.isReadable() || key.isWritable()) + ((SelectableAsyncEndPoint)att).onSelected(); + } + else if (key.isConnectable()) + { + // Complete a connection of a registered channel + SocketChannel channel = (SocketChannel)key.channel(); + try { - if (!k.isValid() || k.interestOps()==0) - continue; - - final SelectableChannel channel = k.channel(); - final Object attachment = k.attachment(); - - if (attachment==null) - addChange(channel); + boolean connected = channel.finishConnect(); + if (connected) + { + AsyncEndPoint endpoint = createEndPoint(channel, key); + key.attach(endpoint); + } else - addChange(channel,attachment); + { + throw new ConnectException(); + } } - _selector.close(); - _selector=new_selector; + catch (Exception x) + { + connectionFailed(channel, x, att); + key.cancel(); + } + } + else + { + throw new IllegalStateException(); } } - catch(IOException e) + catch (CancelledKeyException x) { - throw new RuntimeException("recreating selector",e); + LOG.debug("Ignoring cancelled key for channel", key.channel()); } } - /* ------------------------------------------------------------ */ public SelectorManager getManager() { return SelectorManager.this; } - /* ------------------------------------------------------------ */ - public long getNow() - { - return _timeout.getNow(); - } - - /* ------------------------------------------------------------ */ - /** - * @param task The task to timeout. If it implements Runnable, then - * expired will be called from a dispatched thread. - * - * @param timeoutMs - */ - public void scheduleTimeout(Timeout.Task task, long timeoutMs) - { - if (!(task instanceof Runnable)) - throw new IllegalArgumentException("!Runnable"); - _timeout.schedule(task, timeoutMs); - } - - /* ------------------------------------------------------------ */ - public void cancelTimeout(Timeout.Task task) - { - task.cancel(); - } - - /* ------------------------------------------------------------ */ public void wakeup() { - try - { - Selector selector = _selector; - if (selector!=null) - selector.wakeup(); - } - catch(Exception e) - { - addChange(new ChangeTask() - { - public void run() - { - renewSelector(); - } - }); - - renewSelector(); - } + _selector.wakeup(); } - /* ------------------------------------------------------------ */ private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException { AsyncEndPoint endp = newEndPoint(channel, this, sKey); - LOG.debug("created {}",endp); + _endPoints.put(endp, this); + LOG.debug("Created {}", endp); endPointOpened(endp); - _endPoints.put(endp,this); return endp; } - /* ------------------------------------------------------------ */ + public void destroyEndPoint(SelectableAsyncEndPoint endp) { - LOG.debug("destroyEndPoint {}",endp); + LOG.debug("Destroyed {}", endp); _endPoints.remove(endp); - AsyncConnection connection=endp.getAsyncConnection(); - endp.onClose(); - if (connection!=null) - connection.onClose(); endPointClosed(endp); } - /* ------------------------------------------------------------ */ + // TODO: remove Selector getSelector() { return _selector; } - /* ------------------------------------------------------------ */ - void stop() throws Exception - { - // Spin for a while waiting for selector to complete - // to avoid unneccessary closed channel exceptions - try - { - for (int i=0;i<100 && _selecting!=null;i++) - { - wakeup(); - Thread.sleep(10); - } - } - catch(Exception e) - { - LOG.ignore(e); - } - - // close endpoints and selector - synchronized (this) - { - Selector selector=_selector; - for (SelectionKey key:selector.keys()) - { - if (key==null) - continue; - Object att=key.attachment(); - if (att instanceof EndPoint) - { - EndPoint endpoint = (EndPoint)att; - endpoint.close(); - } - } - - - _timeout.cancelAll(); - try - { - selector=_selector; - if (selector != null) - selector.close(); - } - catch (IOException e) - { - LOG.ignore(e); - } - _selector=null; - } - } - - /* ------------------------------------------------------------ */ public String dump() { return AggregateLifeCycle.dump(this); } - /* ------------------------------------------------------------ */ + public void dump(Appendable out, String indent) throws IOException { - out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); + out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n"); - Thread selecting = _selecting; + Thread selecting = _thread; Object where = "not selecting"; StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); @@ -935,35 +499,19 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa Selector selector=_selector; if (selector!=null) { - final ArrayList dump = new ArrayList(selector.keys().size()*2); + final ArrayList dump = new ArrayList<>(selector.keys().size()*2); dump.add(where); - final CountDownLatch latch = new CountDownLatch(1); - - addChange(new ChangeTask() - { - public void run() - { - dumpKeyState(dump); - latch.countDown(); - } - }); - - try - { - latch.await(5,TimeUnit.SECONDS); - } - catch(InterruptedException e) - { - LOG.ignore(e); - } + DumpKeys dumpKeys = new DumpKeys(dump); + submit(dumpKeys); + dumpKeys.await(5, TimeUnit.SECONDS); AggregateLifeCycle.dump(out,indent,dump); } } - /* ------------------------------------------------------------ */ - public void dumpKeyState(List dumpto) + + public void dumpKeysState(List dumpto) { Selector selector=_selector; Set keys = selector.keys(); @@ -977,7 +525,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } } - /* ------------------------------------------------------------ */ public String toString() { Selector selector=_selector; @@ -986,40 +533,165 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa selector != null && selector.isOpen() ? selector.keys().size() : -1, selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); } - } - /* ------------------------------------------------------------ */ - private static class ChannelAndAttachment - { - final SelectableChannel _channel; - final Object _attachment; - - public ChannelAndAttachment(SelectableChannel channel, Object attachment) + private void timeoutCheck() { - super(); - _channel = channel; - _attachment = attachment; + long now = System.currentTimeMillis(); + for (AsyncEndPoint endPoint : _endPoints.keySet()) + { + // TODO: remove the cast + ((SelectableAsyncEndPoint)endPoint).checkForIdleOrReadWriteTimeout(now); + } + } + + private class DumpKeys implements Runnable + { + private final CountDownLatch latch = new CountDownLatch(1); + private final List _dumps; + + private DumpKeys(List dumps) + { + this._dumps = dumps; + } + + @Override + public void run() + { + dumpKeysState(_dumps); + latch.countDown(); + } + + public boolean await(long timeout, TimeUnit unit) + { + try + { + return latch.await(timeout, unit); + } + catch (InterruptedException x) + { + return false; + } + } + } + + private class Accept implements Runnable + { + private final SocketChannel _channel; + + public Accept(SocketChannel channel) + { + this._channel = channel; + } + + @Override + public void run() + { + try + { + SelectionKey key = _channel.register(_selector, 0, null); + AsyncEndPoint endpoint = createEndPoint(_channel, key); + key.attach(endpoint); + } + catch (IOException x) + { + LOG.debug(x); + } + } + } + + private class Connect implements Runnable + { + private final SocketChannel channel; + private final Object attachment; + + public Connect(SocketChannel channel, Object attachment) + { + this.channel = channel; + this.attachment = attachment; + } + + @Override + public void run() + { + try + { + channel.register(_selector, SelectionKey.OP_CONNECT, attachment); + } + catch (ClosedChannelException x) + { + LOG.debug(x); + } + } + } + + private class Close implements Runnable + { + private final SelectionKey key; + + private Close(SelectionKey key) + { + this.key = key; + } + + @Override + public void run() + { + try + { + key.channel().close(); + } + catch (IOException x) + { + LOG.ignore(x); + } + } + } + + private class Stop implements Runnable + { + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void run() + { + try + { + for (SelectionKey key : _selector.keys()) + { + Object attachment = key.attachment(); + if (attachment instanceof EndPoint) + { + EndPoint endpoint = (EndPoint)attachment; + endpoint.close(); + } + } + + _selector.close(); + } + catch (IOException x) + { + LOG.ignore(x); + } + finally + { + latch.countDown(); + } + } + + public boolean await(long timeout, TimeUnit unit) + { + try + { + return latch.await(timeout, unit); + } + catch (InterruptedException x) + { + return false; + } + } } } - /* ------------------------------------------------------------ */ - public boolean isDeferringInterestedOps0() - { - return _deferringInterestedOps0; - } - - /* ------------------------------------------------------------ */ - public void setDeferringInterestedOps0(boolean deferringInterestedOps0) - { - _deferringInterestedOps0 = deferringInterestedOps0; - } - - - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - private interface ChangeTask extends Runnable - {} // TODO review this interface public interface SelectableAsyncEndPoint extends AsyncEndPoint @@ -1032,4 +704,5 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa void checkForIdleOrReadWriteTimeout(long idle_now); } + } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java index 44fc3cb716b..918acef2cca 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java @@ -82,7 +82,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.register(server); + _manager.accept(server); SSLEngine engine = __sslCtxFactory.newSslEngine(); engine.setUseClientMode(true); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 6c9f6e14b83..30ee65ae2b1 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -36,14 +36,8 @@ public class SelectChannelEndPointTest protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); private int maxIdleTimeout = 600000; // TODO: use smaller value - protected SelectorManager _manager = new SelectorManager() + protected SelectorManager _manager = new SelectorManager(_threadPool) { - @Override - public boolean dispatch(Runnable task) - { - return _threadPool.dispatch(task); - } - @Override protected void endPointClosed(AsyncEndPoint endpoint) { @@ -244,7 +238,7 @@ public class SelectChannelEndPointTest SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.register(server); + _manager.accept(server); // Write client to server client.getOutputStream().write("HelloWorld".getBytes("UTF-8")); @@ -290,10 +284,8 @@ public class SelectChannelEndPointTest if (++i == 10) Assert.fail(); } - } - @Test public void testShutdown() throws Exception { @@ -304,7 +296,7 @@ public class SelectChannelEndPointTest SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.register(server); + _manager.accept(server); // Write client to server client.getOutputStream().write("HelloWorld".getBytes("UTF-8")); @@ -357,7 +349,7 @@ public class SelectChannelEndPointTest SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.register(server); + _manager.accept(server); OutputStream clientOutputStream = client.getOutputStream(); InputStream clientInputStream = client.getInputStream(); @@ -411,7 +403,7 @@ public class SelectChannelEndPointTest SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.register(server); + _manager.accept(server); // Write client to server client.getOutputStream().write("HelloWorld".getBytes("UTF-8")); @@ -459,7 +451,7 @@ public class SelectChannelEndPointTest SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.register(server); + _manager.accept(server); // Write client to server clientOutputStream.write("HelloWorld".getBytes("UTF-8")); @@ -515,7 +507,7 @@ public class SelectChannelEndPointTest SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.register(server); + _manager.accept(server); int writes = 100000; final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); @@ -602,7 +594,7 @@ public class SelectChannelEndPointTest SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.register(server); + _manager.accept(server); // Write client to server _writeCount=10000; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index e28201e0fd9..be5293eb5e3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -14,27 +14,16 @@ package org.eclipse.jetty.server; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.Socket; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicLong; -import javax.servlet.ServletRequest; - -import org.eclipse.jetty.http.HttpFields; -import org.eclipse.jetty.http.HttpHeader; -import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.StandardByteBufferPool; -import org.eclipse.jetty.server.Connector.Statistics; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ThreadPool; /** * Abstract Connector implementation. This abstract implementation of the Connector interface provides: @@ -66,7 +55,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co private ByteBufferPool _byteBufferPool=new StandardByteBufferPool(); // TODO should this be server wide? or a thread local one? private final Statistics _stats = new ConnectionStatistics(); - + protected int _maxIdleTime = 200000; protected int _soLingerTime = -1; @@ -109,7 +98,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co return getServer().getThreadPool(); return _executor; } - + /* ------------------------------------------------------------ */ @Override public Executor getExecutor() @@ -124,7 +113,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co _executor=executor; addBean(_executor); } - + /* ------------------------------------------------------------ */ @Override public ByteBufferPool getByteBufferPool() @@ -406,19 +395,10 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co { accept(_acceptor); } - catch (EofException e) + catch (IOException | InterruptedException e) { LOG.ignore(e); } - catch (IOException e) - { - LOG.ignore(e); - } - catch (InterruptedException x) - { - // Connector has been stopped - LOG.ignore(x); - } catch (Throwable e) { LOG.warn(e); @@ -457,6 +437,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co /* ------------------------------------------------------------ */ protected void connectionOpened(AsyncConnection connection) { + // TODO: should we dispatch the call to onOpen() to another thread ? connection.onOpen(); _stats.connectionOpened(); } @@ -472,34 +453,13 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co /* ------------------------------------------------------------ */ protected void connectionClosed(AsyncConnection connection) { - + // TODO: should we dispatch the call to onClose() to another thread ? + connection.onClose(); + long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp(); + // TODO: remove casts to HttpConnection int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0; - _stats.connectionClosed(duration,requests,requests); - - } - - /* ------------------------------------------------------------ */ - /** - * @return the acceptorPriority - */ - public int getAcceptorPriorityOffset() - { - return _acceptorPriorityOffset; - } - - /* ------------------------------------------------------------ */ - /** - * Set the priority offset of the acceptor threads. The priority is adjusted by this amount (default 0) to either favour the acceptance of new threads and - * newly active connections or to favour the handling of already dispatched connections. - * - * @param offset - * the amount to alter the priority of the acceptor threads. - */ - public void setAcceptorPriorityOffset(int offset) - { - _acceptorPriorityOffset = offset; } /* ------------------------------------------------------------ */ @@ -520,17 +480,4 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co { _reuseAddress = reuseAddress; } - - - /* ------------------------------------------------------------ */ - void updateNotEqual(AtomicLong valueHolder, long compare, long value) - { - long oldValue = valueHolder.get(); - while (compare != oldValue) - { - if (valueHolder.compareAndSet(oldValue,value)) - break; - oldValue = valueHolder.get(); - } - } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index 717bc153f6a..47efaffefb0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -30,7 +30,6 @@ import org.eclipse.jetty.io.SelectorManager.SelectSet; import org.eclipse.jetty.server.AbstractHttpConnector; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.util.thread.ThreadPool; /* ------------------------------------------------------------------------------- */ /** @@ -60,11 +59,10 @@ import org.eclipse.jetty.util.thread.ThreadPool; */ public class SelectChannelConnector extends AbstractHttpConnector { + private SelectorManager _manager; protected ServerSocketChannel _acceptChannel; private int _localPort=-1; - private final SelectorManager _manager = new ConnectorSelectorManager(); - /* ------------------------------------------------------------------------------- */ /** * Constructor. @@ -93,7 +91,7 @@ public class SelectChannelConnector extends AbstractHttpConnector channel.configureBlocking(false); Socket socket = channel.socket(); configure(socket); - _manager.register(channel); + _manager.accept(channel); } } @@ -187,10 +185,10 @@ public class SelectChannelConnector extends AbstractHttpConnector @Override protected void doStart() throws Exception { - _manager.setSelectSets(getAcceptors()); - _manager.setMaxIdleTime(getMaxIdleTime()); - super.doStart(); + _manager = new ConnectorSelectorManager(findExecutor(), getAcceptors()); + _manager.setMaxIdleTime(getMaxIdleTime()); + _manager.start(); } /* ------------------------------------------------------------ */ @@ -204,6 +202,7 @@ public class SelectChannelConnector extends AbstractHttpConnector /* ------------------------------------------------------------------------------- */ protected void endPointClosed(AsyncEndPoint endpoint) { + endpoint.onClose(); connectionClosed(endpoint.getAsyncConnection()); } @@ -219,12 +218,9 @@ public class SelectChannelConnector extends AbstractHttpConnector /* ------------------------------------------------------------ */ private final class ConnectorSelectorManager extends SelectorManager { - @Override - public boolean dispatch(Runnable task) + private ConnectorSelectorManager(Executor executor, int selectSets) { - Executor executor = findExecutor(); - executor.execute(task); - return true; + super(executor, selectSets); } @Override @@ -242,22 +238,20 @@ public class SelectChannelConnector extends AbstractHttpConnector @Override protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection) - { + { connectionUpgraded(oldConnection,endpoint.getAsyncConnection()); } @Override public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) - { + { return SelectChannelConnector.this.newConnection(channel,endpoint); } @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException - { + { return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey); } - - } } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java index c30bdd68b6f..e504273bf93 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java @@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLEngine; @@ -104,7 +103,7 @@ public class SPDYClient SessionPromise result = new SessionPromise(this, listener); channel.connect(address); - factory.selector.register(channel, result); + factory.selector.connect(channel, result); return result; } @@ -209,7 +208,7 @@ public class SPDYClient if (sslContextFactory != null) addBean(sslContextFactory); - selector = new ClientSelectorManager(); + selector = new ClientSelectorManager(threadPool); addBean(selector); factories.put("spdy/2", new ClientSPDYAsyncConnectionFactory()); @@ -267,18 +266,9 @@ public class SPDYClient private class ClientSelectorManager extends SelectorManager { - @Override - public boolean dispatch(Runnable task) + private ClientSelectorManager(Executor executor) { - try - { - threadPool.execute(task); - return true; - } - catch (RejectedExecutionException x) - { - return false; - } + super(executor); } @Override @@ -326,7 +316,7 @@ public class SPDYClient final AtomicReference sslEndPointRef = new AtomicReference<>(); final AtomicReference attachmentRef = new AtomicReference<>(attachment); SSLEngine engine = client.newSSLEngine(sslContextFactory, channel); - SslConnection sslConnection = new SslConnection(engine, endPoint) + SslConnection sslConnection = new SslConnection(bufferPool, threadPool, endPoint, engine) { @Override public void onClose() diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Name.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Name.java new file mode 100644 index 00000000000..0d67ddbee7f --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Name.java @@ -0,0 +1,13 @@ +package org.eclipse.jetty.util; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +public @interface Name +{ + String value(); +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java index e99b87555e2..fc91836c6e0 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java @@ -4,11 +4,11 @@ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at +// The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. +// You may elect to redistribute this code under either of these licenses. // ======================================================================== package org.eclipse.jetty.util.component; @@ -20,8 +20,8 @@ import org.eclipse.jetty.util.log.Logger; /** * Basic implementation of the life cycle interface for components. - * - * + * + * */ public abstract class AbstractLifeCycle implements LifeCycle { @@ -32,12 +32,12 @@ public abstract class AbstractLifeCycle implements LifeCycle public static final String STARTED="STARTED"; public static final String STOPPING="STOPPING"; public static final String RUNNING="RUNNING"; - + + private final CopyOnWriteArrayList _listeners=new CopyOnWriteArrayList(); private final Object _lock = new Object(); private final int __FAILED = -1, __STOPPED = 0, __STARTING = 1, __STARTED = 2, __STOPPING = 3; private volatile int _state = __STOPPED; - - protected final CopyOnWriteArrayList _listeners=new CopyOnWriteArrayList(); + private long _stopTimeout = 10000; protected void doStart() throws Exception { @@ -59,12 +59,7 @@ public abstract class AbstractLifeCycle implements LifeCycle doStart(); setStarted(); } - catch (Exception e) - { - setFailed(e); - throw e; - } - catch (Error e) + catch (Throwable e) { setFailed(e); throw e; @@ -84,12 +79,7 @@ public abstract class AbstractLifeCycle implements LifeCycle doStop(); setStopped(); } - catch (Exception e) - { - setFailed(e); - throw e; - } - catch (Error e) + catch (Throwable e) { setFailed(e); throw e; @@ -100,7 +90,7 @@ public abstract class AbstractLifeCycle implements LifeCycle public boolean isRunning() { final int state = _state; - + return state == __STARTED || state == __STARTING; } @@ -138,7 +128,7 @@ public abstract class AbstractLifeCycle implements LifeCycle { _listeners.remove(listener); } - + public String getState() { switch(_state) @@ -151,7 +141,7 @@ public abstract class AbstractLifeCycle implements LifeCycle } return null; } - + public static String getState(LifeCycle lc) { if (lc.isStarting()) return STARTING; @@ -201,6 +191,16 @@ public abstract class AbstractLifeCycle implements LifeCycle listener.lifeCycleFailure(this,th); } + public long getStopTimeout() + { + return _stopTimeout; + } + + public void setStopTimeout(long stopTimeout) + { + this._stopTimeout = stopTimeout; + } + public static abstract class AbstractLifeCycleListener implements LifeCycle.Listener { public void lifeCycleFailure(LifeCycle event, Throwable cause) {}