diff --git a/VERSION.txt b/VERSION.txt index 8db351c6436..3547fb5dbf3 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1,6 +1,6 @@ jetty-7.2-SNAPSHOT - + 320264 dupliate mime entry + + 314087 Simplified SelectorManager + 319334 Concurrent, sharable ResourceCache + 319370 WebAppClassLoader.Context + 319444 Two nulls are appended to log statements from ContextHanler$Context diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java index 730a53a01ae..24e714b8036 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.client; import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -43,8 +44,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, { private final HttpClient _httpClient; private final Manager _selectorManager=new Manager(); - private final Timeout _connectTimer = new Timeout(); - private final Map _connectingChannels = new ConcurrentHashMap(); private SSLContext _sslContext; private Buffers _sslBuffers; private boolean _blockingConnect; @@ -81,28 +80,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, { super.doStart(); - _connectTimer.setDuration(_httpClient.getConnectTimeout()); - _connectTimer.setNow(); - _httpClient._threadPool.dispatch(new Runnable() - { - public void run() - { - while (isRunning()) - { - _connectTimer.tick(System.currentTimeMillis()); - try - { - Thread.sleep(200); - } - catch (InterruptedException x) - { - Thread.currentThread().interrupt(); - break; - } - } - } - }); - _selectorManager.start(); final boolean direct=_httpClient.getUseDirectBuffers(); @@ -151,7 +128,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, @Override protected void doStop() throws Exception { - _connectTimer.cancelAll(); _selectorManager.stop(); } @@ -159,15 +135,24 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, public void startConnection( HttpDestination destination ) throws IOException { - SocketChannel channel = SocketChannel.open(); - Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress(); - channel.configureBlocking( false ); - channel.socket().setTcpNoDelay(true); - channel.connect(address.toSocketAddress()); - _selectorManager.register( channel, destination ); - ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination); - _connectTimer.schedule(connectTimeout); - _connectingChannels.put(channel, connectTimeout); + try + { + SocketChannel channel = SocketChannel.open(); + Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress(); + channel.configureBlocking( true ); + channel.socket().setTcpNoDelay(true); + channel.socket().setSoTimeout(_httpClient.getConnectTimeout()); + channel.connect(address.toSocketAddress()); + channel.configureBlocking(false); + channel.socket().setSoTimeout((int)_httpClient.getTimeout()); + + _selectorManager.register( channel, destination ); + } + catch(IOException ex) + { + destination.onConnectionFailed(ex); + } + } /* ------------------------------------------------------------ */ @@ -191,12 +176,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, /* ------------------------------------------------------------ */ class Manager extends SelectorManager { - @Override - protected SocketChannel acceptChannel(SelectionKey key) throws IOException - { - throw new IllegalStateException(); - } - @Override public boolean dispatch(Runnable task) { @@ -230,12 +209,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException { - // We're connected, cancel the connect timeout - Timeout.Task connectTimeout = _connectingChannels.remove(channel); - if (connectTimeout != null) - connectTimeout.cancel(); - Log.debug("Channels with connection pending: {}", _connectingChannels.size()); - // key should have destination at this point (will be replaced by endpoint after this call) HttpDestination dest=(HttpDestination)key.attachment(); @@ -278,19 +251,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, return sslEngine; } - - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object) - */ - @Override - protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) - { - if (attachment instanceof HttpDestination) - ((HttpDestination)attachment).onConnectionFailed(ex); - else - super.connectionFailed(channel,ex,attachment); - } } private class ConnectTimeout extends Timeout.Task @@ -307,7 +267,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, @Override public void expired() { - _connectingChannels.remove(channel); if (channel.isConnectionPending()) { Log.debug("Channel {} timed out while connecting, closing it", channel); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpExchangeTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpExchangeTest.java index d997cde77cb..3a862d0c486 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpExchangeTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpExchangeTest.java @@ -343,6 +343,7 @@ public class HttpExchangeTest extends TestCase { ContentExchange httpExchange=new ContentExchange() { + }; //httpExchange.setURL(_scheme+"localhost:"+_port+"/"); httpExchange.setURL(_scheme+"localhost:"+_port); @@ -371,7 +372,7 @@ public class HttpExchangeTest extends TestCase try { - Thread.sleep(250); + Thread.sleep(25); } catch (InterruptedException e) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java index 5ee2f548be4..14e2ffd5372 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @@ -31,10 +31,15 @@ import org.eclipse.jetty.util.log.Log; /** * An Endpoint that can be scheduled by {@link SelectorManager}. */ -public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint +public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint { private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; + private final Runnable _handler = new Runnable() + { + public void run() { handle(); } + }; + private volatile Connection _connection; private boolean _dispatched = false; private boolean _redispatched = false; @@ -70,7 +75,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, throws IOException { super(channel); - + _manager = selectSet.getManager(); _selectSet = selectSet; _dispatched = false; @@ -179,7 +184,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, _redispatched=true; else { - _dispatched = _manager.dispatch(this); + _dispatched = _manager.dispatch(_handler); if(!_dispatched) { Log.warn("Dispatched Failed!"); @@ -491,7 +496,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, /* ------------------------------------------------------------ */ /* */ - public void run() + private void handle() { boolean dispatched=true; try diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java index a51a57a4a30..0e54bff30d6 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java @@ -106,6 +106,7 @@ public abstract class SelectorManager extends AbstractLifeCycle { return _selectSet[i]; } + /* ------------------------------------------------------------ */ /** Register a channel * @param channel @@ -127,6 +128,29 @@ public abstract class SelectorManager extends AbstractLifeCycle set.wakeup(); } } + + + /* ------------------------------------------------------------ */ + /** Register a channel + * @param channel + * @param att Attached Object + */ + public void register(SocketChannel channel) + { + // The ++ increment here is not atomic, but it does not matter. + // so long as the value changes sometimes, then connections will + // be distributed over the available sets. + + int s=_set++; + s=s%_selectSets; + SelectSet[] sets=_selectSet; + if (sets!=null) + { + SelectSet set=sets[s]; + set.addChange(channel); + set.wakeup(); + } + } /* ------------------------------------------------------------ */ /** Register a {@link ServerSocketChannel} @@ -193,14 +217,6 @@ public abstract class SelectorManager extends AbstractLifeCycle sets[acceptorID].doSelect(); } - /* ------------------------------------------------------------ */ - /** - * @param key the selection key - * @return the SocketChannel created on accept - * @throws IOException - */ - protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException; - /* ------------------------------------------------------------------------------- */ public abstract boolean dispatch(Runnable task); @@ -265,13 +281,6 @@ public abstract class SelectorManager extends AbstractLifeCycle */ protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; - /* ------------------------------------------------------------------------------- */ - protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) - { - Log.warn(ex+","+channel+","+attachment); - Log.debug(ex); - } - /* ------------------------------------------------------------------------------- */ public void dump() { @@ -291,7 +300,7 @@ public abstract class SelectorManager extends AbstractLifeCycle } } - set.addChange(new ChangeTask(){ + set.addChange(new Runnable(){ public void run() { set.dump(); @@ -313,7 +322,6 @@ public abstract class SelectorManager extends AbstractLifeCycle private Selector _selector; - private int _nextSet; private volatile Thread _selecting; private int _jvmBug; private int _selects; @@ -347,11 +355,11 @@ public abstract class SelectorManager extends AbstractLifeCycle } /* ------------------------------------------------------------ */ - public void addChange(Object point) + public void addChange(Object change) { - _changes.add(point); + _changes.add(change); } - + /* ------------------------------------------------------------ */ public void addChange(SelectableChannel channel, Object att) { @@ -360,7 +368,7 @@ public abstract class SelectorManager extends AbstractLifeCycle else if (att instanceof EndPoint) addChange(att); else - addChange(new ChangeSelectableChannel(channel,att)); + addChange(new ChannelAndAttachment(channel,att)); } /* ------------------------------------------------------------ */ @@ -389,53 +397,29 @@ public abstract class SelectorManager extends AbstractLifeCycle SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; endpoint.doUpdateKey(); } - else if (change instanceof Runnable) - { - dispatch((Runnable)change); - } - else if (change instanceof ChangeSelectableChannel) + else if (change instanceof ChannelAndAttachment) { // finish accepting/connecting this connection - final ChangeSelectableChannel asc = (ChangeSelectableChannel)change; + final ChannelAndAttachment asc = (ChannelAndAttachment)change; final SelectableChannel channel=asc._channel; final Object att = asc._attachment; - - if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) - { - SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att); - SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else if (channel.isOpen()) - { - channel.register(selector,SelectionKey.OP_CONNECT,att); - } + SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att); + SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); + key.attach(endpoint); + endpoint.schedule(); } else if (change instanceof SocketChannel) { + // Newly registered channel final SocketChannel channel=(SocketChannel)change; - - if (channel.isConnected()) - { - SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else if (channel.isOpen()) - { - channel.register(selector,SelectionKey.OP_CONNECT,null); - } + SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); + key.attach(endpoint); + endpoint.schedule(); } - else if (change instanceof ServerSocketChannel) + else if (change instanceof Runnable) { - ServerSocketChannel channel = (ServerSocketChannel)change; - channel.register(getSelector(),SelectionKey.OP_ACCEPT); - } - else if (change instanceof ChangeTask) - { - ((ChangeTask)change).run(); + dispatch((Runnable)change); } else throw new IllegalArgumentException(change.toString()); @@ -449,19 +433,15 @@ public abstract class SelectorManager extends AbstractLifeCycle } } - long retry_next; + + // Do and instant select to see if any connections can be handled. + int selected=selector.selectNow(); + _selects++; + long now=System.currentTimeMillis(); - _timeout.setNow(now); - - retry_next=_timeout.getTimeToNext(); - - // workout how low to wait in select - long wait = _changes.size()==0?__IDLE_TICK:0L; - if (wait > 0 && retry_next >= 0 && wait > retry_next) - wait = retry_next; - - // Do the select. - if (wait > 0) + + // if no immediate things to do + if (selected==0) { // If we are in pausing mode if (_pausing) @@ -474,160 +454,29 @@ public abstract class SelectorManager extends AbstractLifeCycle { Log.ignore(e); } + now=System.currentTimeMillis(); } - - long before=now; - int selected=selector.select(wait); - now = System.currentTimeMillis(); + + // workout how long to wait in select _timeout.setNow(now); - _selects++; - - // Look for JVM bugs over a monitor period. - // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 - // http://bugs.sun.com/view_bug.do?bug_id=6693490 - if (now>_monitorNext) - { - _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart)); - _pausing=_selects>__MAX_SELECTS; - if (_pausing) - _paused++; - - _selects=0; - _jvmBug=0; - _monitorStart=now; - _monitorNext=now+__MONITOR_PERIOD; - } - - if (now>_log) - { - if (_paused>0) - Log.debug(this+" Busy selector - injecting delay "+_paused+" times"); + long to_next_timeout=_timeout.getTimeToNext(); - if (_jvmFix2>0) - Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times"); + long wait = _changes.size()==0?__IDLE_TICK:0L; + if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) + wait = to_next_timeout; - if (_jvmFix1>0) - Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times"); - - else if(Log.isDebugEnabled() && _jvmFix0>0) - Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times"); - _paused=0; - _jvmFix2=0; - _jvmFix1=0; - _jvmFix0=0; - _log=now+60000; - } - - // If we see signature of possible JVM bug, increment count. - if (selected==0 && wait>10 && (now-before)<(wait/2)) + // If we should wait with a select + if (wait>0) { - // Increment bug count and try a work around - _jvmBug++; - if (_jvmBug>(__JVMBUG_THRESHHOLD)) - { - try - { - if (_jvmBug==__JVMBUG_THRESHHOLD+1) - _jvmFix2++; - - Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop - } - catch(InterruptedException e) - { - Log.ignore(e); - } - } - else if (_jvmBug==__JVMBUG_THRESHHOLD) - { - synchronized (this) - { - // BLOODY SUN BUG !!! Try refreshing the entire selector. - final Selector new_selector = Selector.open(); - for (SelectionKey k: selector.keys()) - { - if (!k.isValid() || k.interestOps()==0) - continue; - - final SelectableChannel channel = k.channel(); - final Object attachment = k.attachment(); - - if (attachment==null) - addChange(channel); - else - addChange(channel,attachment); - } - Selector old_selector=_selector; - _selector=new_selector; - try - { - old_selector.close(); - } - catch(Exception e) - { - Log.ignore(e); - } - return; - } - } - else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops - { - // Cancel keys with 0 interested ops - int cancelled=0; - for (SelectionKey k: selector.keys()) - { - if (k.isValid()&&k.interestOps()==0) - { - k.cancel(); - cancelled++; - } - } - if (cancelled>0) - _jvmFix0++; - - return; - } - } - else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS) - { - // Look for busy key - SelectionKey busy = selector.selectedKeys().iterator().next(); - if (busy==_busyKey) - { - if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel)) - { - final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment(); - Log.warn("Busy Key "+busy.channel()+" "+endpoint); - busy.cancel(); - if (endpoint!=null) - { - dispatch(new Runnable() - { - public void run() - { - try - { - endpoint.close(); - } - catch (IOException e) - { - Log.ignore(e); - } - } - }); - } - } - } - else - _busyKeyCount=0; - _busyKey=busy; + long before=now; + selected=selector.select(wait); + _selects++; + now = System.currentTimeMillis(); + _timeout.setNow(now); + checkJvmBugs(before, now, wait, selected); } } - else - { - selector.selectNow(); - _selects++; - } - + // have we been destroyed while sleeping if (_selector==null || !selector.isOpen()) return; @@ -651,63 +500,6 @@ public abstract class SelectorManager extends AbstractLifeCycle { ((SelectChannelEndPoint)att).schedule(); } - else if (key.isAcceptable()) - { - SocketChannel channel = acceptChannel(key); - if (channel==null) - continue; - - channel.configureBlocking(false); - - // TODO make it reluctant to leave 0 - _nextSet=++_nextSet%_selectSet.length; - - // Is this for this selectset - if (_nextSet==_setID) - { - // bind connections to this select set. - SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ); - - SelectChannelEndPoint endpoint=_selectSet[_nextSet].createEndPoint(channel,cKey); - cKey.attach(endpoint); - if (endpoint != null) - endpoint.schedule(); - } - else - { - // nope - give it to another. - _selectSet[_nextSet].addChange(channel); - _selectSet[_nextSet].wakeup(); - } - } - else if (key.isConnectable()) - { - // Complete a connection of a registered channel - SocketChannel channel = (SocketChannel)key.channel(); - boolean connected=false; - try - { - connected=channel.finishConnect(); - } - catch(Exception e) - { - connectionFailed(channel,e,att); - } - finally - { - if (connected) - { - key.interestOps(SelectionKey.OP_READ); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else - { - key.cancel(); - } - } - } else { // Wrap readable registered channel in an endpoint @@ -738,15 +530,13 @@ public abstract class SelectorManager extends AbstractLifeCycle // Everything always handled selector.selectedKeys().clear(); + now=System.currentTimeMillis(); _timeout.setNow(now); Task task = _timeout.expired(); while (task!=null) { if (task instanceof Runnable) dispatch((Runnable)task); - else - task.expired(); - task = _timeout.expired(); } @@ -780,7 +570,148 @@ public abstract class SelectorManager extends AbstractLifeCycle _selecting=null; } } + + /* ------------------------------------------------------------ */ + private void checkJvmBugs(long before, long now, long wait, int selected) + throws IOException + { + Selector selector = _selector; + if (selector==null) + return; + + // Look for JVM bugs over a monitor period. + // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 + // http://bugs.sun.com/view_bug.do?bug_id=6693490 + if (now>_monitorNext) + { + _selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart)); + _pausing=_selects>__MAX_SELECTS; + if (_pausing) + _paused++; + _selects=0; + _jvmBug=0; + _monitorStart=now; + _monitorNext=now+__MONITOR_PERIOD; + } + + if (now>_log) + { + if (_paused>0) + Log.debug(this+" Busy selector - injecting delay "+_paused+" times"); + + if (_jvmFix2>0) + Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times"); + + if (_jvmFix1>0) + Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times"); + + else if(Log.isDebugEnabled() && _jvmFix0>0) + Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times"); + _paused=0; + _jvmFix2=0; + _jvmFix1=0; + _jvmFix0=0; + _log=now+60000; + } + + // If we see signature of possible JVM bug, increment count. + if (selected==0 && wait>10 && (now-before)<(wait/2)) + { + // Increment bug count and try a work around + _jvmBug++; + if (_jvmBug>(__JVMBUG_THRESHHOLD)) + { + try + { + if (_jvmBug==__JVMBUG_THRESHHOLD+1) + _jvmFix2++; + + Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop + } + catch(InterruptedException e) + { + Log.ignore(e); + } + } + else if (_jvmBug==__JVMBUG_THRESHHOLD) + { + synchronized (this) + { + // BLOODY SUN BUG !!! Try refreshing the entire selector. + final Selector new_selector = Selector.open(); + for (SelectionKey k: selector.keys()) + { + if (!k.isValid() || k.interestOps()==0) + continue; + + final SelectableChannel channel = k.channel(); + final Object attachment = k.attachment(); + + if (attachment==null) + addChange(channel); + else + addChange(channel,attachment); + } + _selector.close(); + _selector=new_selector; + return; + } + } + else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops + { + // Cancel keys with 0 interested ops + int cancelled=0; + for (SelectionKey k: selector.keys()) + { + if (k.isValid()&&k.interestOps()==0) + { + k.cancel(); + cancelled++; + } + } + if (cancelled>0) + _jvmFix0++; + + return; + } + } + else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS) + { + // Look for busy key + SelectionKey busy = selector.selectedKeys().iterator().next(); + if (busy==_busyKey) + { + if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel)) + { + final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment(); + Log.warn("Busy Key "+busy.channel()+" "+endpoint); + busy.cancel(); + if (endpoint!=null) + { + dispatch(new Runnable() + { + public void run() + { + try + { + endpoint.close(); + } + catch (IOException e) + { + Log.ignore(e); + } + } + }); + } + } + } + else + _busyKeyCount=0; + _busyKey=busy; + } + } + /* ------------------------------------------------------------ */ public SelectorManager getManager() { @@ -802,6 +733,8 @@ public abstract class SelectorManager extends AbstractLifeCycle */ public void scheduleTimeout(Timeout.Task task, long timeoutMs) { + if (!(task instanceof Runnable)) + throw new IllegalArgumentException("!Runnable"); _timeout.schedule(task, timeoutMs); } @@ -911,22 +844,16 @@ public abstract class SelectorManager extends AbstractLifeCycle } /* ------------------------------------------------------------ */ - private static class ChangeSelectableChannel + private static class ChannelAndAttachment { final SelectableChannel _channel; final Object _attachment; - public ChangeSelectableChannel(SelectableChannel channel, Object attachment) + public ChannelAndAttachment(SelectableChannel channel, Object attachment) { super(); _channel = channel; _attachment = attachment; } } - - /* ------------------------------------------------------------ */ - private interface ChangeTask - { - public void run(); - } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslSelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslSelectChannelEndPoint.java index b9944a288e4..fa5e1b50b45 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslSelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslSelectChannelEndPoint.java @@ -90,60 +90,56 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint if (_debug) __log.debug(_session+" channel="+channel); } + + int _outCount; + /* ------------------------------------------------------------ */ private void needOutBuffer() { - if (_outNIOBuffer==null) + synchronized (this) { - synchronized (this) - { - if (_outNIOBuffer==null) - _outNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize()); - } - } - } - - /* ------------------------------------------------------------ */ - private void needInBuffer() - { - if (_inNIOBuffer==null) - { - synchronized (this) - { - if(_inNIOBuffer==null) - _inNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize()); - } + _outCount++; + if (_outNIOBuffer==null) + _outNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize()); } } /* ------------------------------------------------------------ */ private void freeOutBuffer() { - if (_outNIOBuffer!=null && _outNIOBuffer.length()==0) - { - synchronized (this) - { - if (_outNIOBuffer!=null && _outNIOBuffer.length()==0) - { - _buffers.returnBuffer(_outNIOBuffer); - _outNIOBuffer=null; - } + synchronized (this) + { + if (--_outCount<=0 && _outNIOBuffer!=null && _outNIOBuffer.length()==0) + { + _buffers.returnBuffer(_outNIOBuffer); + _outNIOBuffer=null; + _outCount=0; } } } + + int _inCount; + /* ------------------------------------------------------------ */ + private void needInBuffer() + { + synchronized (this) + { + _inCount++; + if(_inNIOBuffer==null) + _inNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize()); + } + } /* ------------------------------------------------------------ */ private void freeInBuffer() { - if (_inNIOBuffer!=null && _inNIOBuffer.length()==0) - { - synchronized (this) - { - if (_inNIOBuffer!=null && _inNIOBuffer.length()==0) - { - _buffers.returnBuffer(_inNIOBuffer); - _inNIOBuffer=null; - } + synchronized (this) + { + if (--_inCount<=0 &&_inNIOBuffer!=null && _inNIOBuffer.length()==0) + { + _buffers.returnBuffer(_inNIOBuffer); + _inNIOBuffer=null; + _inCount=0; } } } @@ -184,10 +180,10 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint long end=System.currentTimeMillis()+((SocketChannel)_channel).socket().getSoTimeout(); try { - if (isBufferingOutput()) + while (isOpen() && isBufferingOutput()&& System.currentTimeMillis()0) + flush(); _outNIOBuffer.compact(); int put=_outNIOBuffer.putIndex(); out_buffer.position(put); @@ -271,9 +266,10 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint } } } - catch (InterruptedException e) + catch (Exception e) { - Log.ignore(e); + Log.debug(e); + super.close(); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ProxyHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ProxyHandler.java index c677d86df2d..7e366590795 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ProxyHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ProxyHandler.java @@ -411,13 +411,6 @@ public class ProxyHandler extends HandlerWrapper private class Manager extends SelectorManager { - @Override - protected SocketChannel acceptChannel(SelectionKey key) throws IOException - { - // This is a client-side selector manager - throw new IllegalStateException(); - } - @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index d1f63cfb55d..acaf064f7ad 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -67,19 +67,6 @@ public class SelectChannelConnector extends AbstractNIOConnector private final SelectorManager _manager = new SelectorManager() { - @Override - protected SocketChannel acceptChannel(SelectionKey key) throws IOException - { - // TODO handle max connections - SocketChannel channel = ((ServerSocketChannel)key.channel()).accept(); - if (channel==null) - return null; - channel.configureBlocking(false); - Socket socket = channel.socket(); - configure(socket); - return channel; - } - @Override public boolean dispatch(Runnable task) { @@ -212,9 +199,6 @@ public class SelectChannelConnector extends AbstractNIOConnector if (_localPort<=0) throw new IOException("Server channel not bound"); - // Set to non blocking mode - _acceptChannel.configureBlocking(false); - } } } @@ -287,8 +271,32 @@ public class SelectChannelConnector extends AbstractNIOConnector _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); _manager.start(); open(); - _manager.register(_acceptChannel); super.doStart(); + + // start a thread to accept new connections + _manager.dispatch(new Runnable() + { + public void run() + { + final ServerSocketChannel server=_acceptChannel; + while (isRunning() && _acceptChannel==server && server.isOpen()) + { + try + { + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + Socket socket = channel.socket(); + configure(socket); + _manager.register(channel); + } + catch(IOException e) + { + Log.ignore(e); + } + } + } + }); + } /* ------------------------------------------------------------ */ diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BusySelectChannelServerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BusySelectChannelServerTest.java index 10c318c6654..47980aada54 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/BusySelectChannelServerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BusySelectChannelServerTest.java @@ -33,7 +33,7 @@ public class BusySelectChannelServerTest extends HttpServerTestBase @BeforeClass public static void init() throws Exception { - startServer(new SelectChannelConnector() + SelectChannelConnector connector=new SelectChannelConnector() { @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException @@ -128,6 +128,8 @@ public class BusySelectChannelServerTest extends HttpServerTestBase } }; } - }); + }; + connector.setAcceptors(1); + startServer(connector); } }