314087 Simplified SelectorManager

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@2189 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2010-07-28 13:11:40 +00:00
parent bd272143e2
commit ef9ebf31b6
9 changed files with 315 additions and 424 deletions

View File

@ -1,6 +1,6 @@
jetty-7.2-SNAPSHOT jetty-7.2-SNAPSHOT
+ 320264 dupliate mime entry + 314087 Simplified SelectorManager
+ 319334 Concurrent, sharable ResourceCache + 319334 Concurrent, sharable ResourceCache
+ 319370 WebAppClassLoader.Context + 319370 WebAppClassLoader.Context
+ 319444 Two nulls are appended to log statements from ContextHanler$Context + 319444 Two nulls are appended to log statements from ContextHanler$Context

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.client;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -43,8 +44,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
{ {
private final HttpClient _httpClient; private final HttpClient _httpClient;
private final Manager _selectorManager=new Manager(); private final Manager _selectorManager=new Manager();
private final Timeout _connectTimer = new Timeout();
private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
private SSLContext _sslContext; private SSLContext _sslContext;
private Buffers _sslBuffers; private Buffers _sslBuffers;
private boolean _blockingConnect; private boolean _blockingConnect;
@ -81,28 +80,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
{ {
super.doStart(); super.doStart();
_connectTimer.setDuration(_httpClient.getConnectTimeout());
_connectTimer.setNow();
_httpClient._threadPool.dispatch(new Runnable()
{
public void run()
{
while (isRunning())
{
_connectTimer.tick(System.currentTimeMillis());
try
{
Thread.sleep(200);
}
catch (InterruptedException x)
{
Thread.currentThread().interrupt();
break;
}
}
}
});
_selectorManager.start(); _selectorManager.start();
final boolean direct=_httpClient.getUseDirectBuffers(); final boolean direct=_httpClient.getUseDirectBuffers();
@ -151,7 +128,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
_connectTimer.cancelAll();
_selectorManager.stop(); _selectorManager.stop();
} }
@ -159,15 +135,24 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
public void startConnection( HttpDestination destination ) public void startConnection( HttpDestination destination )
throws IOException throws IOException
{ {
SocketChannel channel = SocketChannel.open(); try
Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress(); {
channel.configureBlocking( false ); SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true); Address address = destination.isProxied() ? destination.getProxy() : destination.getAddress();
channel.connect(address.toSocketAddress()); channel.configureBlocking( true );
_selectorManager.register( channel, destination ); channel.socket().setTcpNoDelay(true);
ConnectTimeout connectTimeout = new ConnectTimeout(channel, destination); channel.socket().setSoTimeout(_httpClient.getConnectTimeout());
_connectTimer.schedule(connectTimeout); channel.connect(address.toSocketAddress());
_connectingChannels.put(channel, connectTimeout); channel.configureBlocking(false);
channel.socket().setSoTimeout((int)_httpClient.getTimeout());
_selectorManager.register( channel, destination );
}
catch(IOException ex)
{
destination.onConnectionFailed(ex);
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -191,12 +176,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
class Manager extends SelectorManager class Manager extends SelectorManager
{ {
@Override
protected SocketChannel acceptChannel(SelectionKey key) throws IOException
{
throw new IllegalStateException();
}
@Override @Override
public boolean dispatch(Runnable task) public boolean dispatch(Runnable task)
{ {
@ -230,12 +209,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
@Override @Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{ {
// We're connected, cancel the connect timeout
Timeout.Task connectTimeout = _connectingChannels.remove(channel);
if (connectTimeout != null)
connectTimeout.cancel();
Log.debug("Channels with connection pending: {}", _connectingChannels.size());
// key should have destination at this point (will be replaced by endpoint after this call) // key should have destination at this point (will be replaced by endpoint after this call)
HttpDestination dest=(HttpDestination)key.attachment(); HttpDestination dest=(HttpDestination)key.attachment();
@ -278,19 +251,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
return sslEngine; return sslEngine;
} }
/* ------------------------------------------------------------ */
/* (non-Javadoc)
* @see org.eclipse.io.nio.SelectorManager#connectionFailed(java.nio.channels.SocketChannel, java.lang.Throwable, java.lang.Object)
*/
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (attachment instanceof HttpDestination)
((HttpDestination)attachment).onConnectionFailed(ex);
else
super.connectionFailed(channel,ex,attachment);
}
} }
private class ConnectTimeout extends Timeout.Task private class ConnectTimeout extends Timeout.Task
@ -307,7 +267,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
@Override @Override
public void expired() public void expired()
{ {
_connectingChannels.remove(channel);
if (channel.isConnectionPending()) if (channel.isConnectionPending())
{ {
Log.debug("Channel {} timed out while connecting, closing it", channel); Log.debug("Channel {} timed out while connecting, closing it", channel);

View File

@ -343,6 +343,7 @@ public class HttpExchangeTest extends TestCase
{ {
ContentExchange httpExchange=new ContentExchange() ContentExchange httpExchange=new ContentExchange()
{ {
}; };
//httpExchange.setURL(_scheme+"localhost:"+_port+"/"); //httpExchange.setURL(_scheme+"localhost:"+_port+"/");
httpExchange.setURL(_scheme+"localhost:"+_port); httpExchange.setURL(_scheme+"localhost:"+_port);
@ -371,7 +372,7 @@ public class HttpExchangeTest extends TestCase
try try
{ {
Thread.sleep(250); Thread.sleep(25);
} }
catch (InterruptedException e) catch (InterruptedException e)
{ {

View File

@ -31,10 +31,15 @@ import org.eclipse.jetty.util.log.Log;
/** /**
* An Endpoint that can be scheduled by {@link SelectorManager}. * An Endpoint that can be scheduled by {@link SelectorManager}.
*/ */
public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
{ {
private final SelectorManager.SelectSet _selectSet; private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager; private final SelectorManager _manager;
private final Runnable _handler = new Runnable()
{
public void run() { handle(); }
};
private volatile Connection _connection; private volatile Connection _connection;
private boolean _dispatched = false; private boolean _dispatched = false;
private boolean _redispatched = false; private boolean _redispatched = false;
@ -70,7 +75,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
throws IOException throws IOException
{ {
super(channel); super(channel);
_manager = selectSet.getManager(); _manager = selectSet.getManager();
_selectSet = selectSet; _selectSet = selectSet;
_dispatched = false; _dispatched = false;
@ -179,7 +184,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
_redispatched=true; _redispatched=true;
else else
{ {
_dispatched = _manager.dispatch(this); _dispatched = _manager.dispatch(_handler);
if(!_dispatched) if(!_dispatched)
{ {
Log.warn("Dispatched Failed!"); Log.warn("Dispatched Failed!");
@ -491,7 +496,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* /*
*/ */
public void run() private void handle()
{ {
boolean dispatched=true; boolean dispatched=true;
try try

View File

@ -106,6 +106,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
{ {
return _selectSet[i]; return _selectSet[i];
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Register a channel /** Register a channel
* @param channel * @param channel
@ -127,6 +128,29 @@ public abstract class SelectorManager extends AbstractLifeCycle
set.wakeup(); set.wakeup();
} }
} }
/* ------------------------------------------------------------ */
/** Register a channel
* @param channel
* @param att Attached Object
*/
public void register(SocketChannel channel)
{
// The ++ increment here is not atomic, but it does not matter.
// so long as the value changes sometimes, then connections will
// be distributed over the available sets.
int s=_set++;
s=s%_selectSets;
SelectSet[] sets=_selectSet;
if (sets!=null)
{
SelectSet set=sets[s];
set.addChange(channel);
set.wakeup();
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Register a {@link ServerSocketChannel} /** Register a {@link ServerSocketChannel}
@ -193,14 +217,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
sets[acceptorID].doSelect(); sets[acceptorID].doSelect();
} }
/* ------------------------------------------------------------ */
/**
* @param key the selection key
* @return the SocketChannel created on accept
* @throws IOException
*/
protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
public abstract boolean dispatch(Runnable task); public abstract boolean dispatch(Runnable task);
@ -265,13 +281,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
*/ */
protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
/* ------------------------------------------------------------------------------- */
protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
{
Log.warn(ex+","+channel+","+attachment);
Log.debug(ex);
}
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
public void dump() public void dump()
{ {
@ -291,7 +300,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
} }
} }
set.addChange(new ChangeTask(){ set.addChange(new Runnable(){
public void run() public void run()
{ {
set.dump(); set.dump();
@ -313,7 +322,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
private Selector _selector; private Selector _selector;
private int _nextSet;
private volatile Thread _selecting; private volatile Thread _selecting;
private int _jvmBug; private int _jvmBug;
private int _selects; private int _selects;
@ -347,11 +355,11 @@ public abstract class SelectorManager extends AbstractLifeCycle
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void addChange(Object point) public void addChange(Object change)
{ {
_changes.add(point); _changes.add(change);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void addChange(SelectableChannel channel, Object att) public void addChange(SelectableChannel channel, Object att)
{ {
@ -360,7 +368,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
else if (att instanceof EndPoint) else if (att instanceof EndPoint)
addChange(att); addChange(att);
else else
addChange(new ChangeSelectableChannel(channel,att)); addChange(new ChannelAndAttachment(channel,att));
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -389,53 +397,29 @@ public abstract class SelectorManager extends AbstractLifeCycle
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
endpoint.doUpdateKey(); endpoint.doUpdateKey();
} }
else if (change instanceof Runnable) else if (change instanceof ChannelAndAttachment)
{
dispatch((Runnable)change);
}
else if (change instanceof ChangeSelectableChannel)
{ {
// finish accepting/connecting this connection // finish accepting/connecting this connection
final ChangeSelectableChannel asc = (ChangeSelectableChannel)change; final ChannelAndAttachment asc = (ChannelAndAttachment)change;
final SelectableChannel channel=asc._channel; final SelectableChannel channel=asc._channel;
final Object att = asc._attachment; final Object att = asc._attachment;
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
{ key.attach(endpoint);
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att); endpoint.schedule();
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (channel.isOpen())
{
channel.register(selector,SelectionKey.OP_CONNECT,att);
}
} }
else if (change instanceof SocketChannel) else if (change instanceof SocketChannel)
{ {
// Newly registered channel
final SocketChannel channel=(SocketChannel)change; final SocketChannel channel=(SocketChannel)change;
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
if (channel.isConnected()) SelectChannelEndPoint endpoint = createEndPoint(channel,key);
{ key.attach(endpoint);
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null); endpoint.schedule();
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else if (channel.isOpen())
{
channel.register(selector,SelectionKey.OP_CONNECT,null);
}
} }
else if (change instanceof ServerSocketChannel) else if (change instanceof Runnable)
{ {
ServerSocketChannel channel = (ServerSocketChannel)change; dispatch((Runnable)change);
channel.register(getSelector(),SelectionKey.OP_ACCEPT);
}
else if (change instanceof ChangeTask)
{
((ChangeTask)change).run();
} }
else else
throw new IllegalArgumentException(change.toString()); throw new IllegalArgumentException(change.toString());
@ -449,19 +433,15 @@ public abstract class SelectorManager extends AbstractLifeCycle
} }
} }
long retry_next;
// Do and instant select to see if any connections can be handled.
int selected=selector.selectNow();
_selects++;
long now=System.currentTimeMillis(); long now=System.currentTimeMillis();
_timeout.setNow(now);
// if no immediate things to do
retry_next=_timeout.getTimeToNext(); if (selected==0)
// workout how low to wait in select
long wait = _changes.size()==0?__IDLE_TICK:0L;
if (wait > 0 && retry_next >= 0 && wait > retry_next)
wait = retry_next;
// Do the select.
if (wait > 0)
{ {
// If we are in pausing mode // If we are in pausing mode
if (_pausing) if (_pausing)
@ -474,160 +454,29 @@ public abstract class SelectorManager extends AbstractLifeCycle
{ {
Log.ignore(e); Log.ignore(e);
} }
now=System.currentTimeMillis();
} }
long before=now; // workout how long to wait in select
int selected=selector.select(wait);
now = System.currentTimeMillis();
_timeout.setNow(now); _timeout.setNow(now);
_selects++; long to_next_timeout=_timeout.getTimeToNext();
// Look for JVM bugs over a monitor period.
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
// http://bugs.sun.com/view_bug.do?bug_id=6693490
if (now>_monitorNext)
{
_selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
_pausing=_selects>__MAX_SELECTS;
if (_pausing)
_paused++;
_selects=0;
_jvmBug=0;
_monitorStart=now;
_monitorNext=now+__MONITOR_PERIOD;
}
if (now>_log)
{
if (_paused>0)
Log.debug(this+" Busy selector - injecting delay "+_paused+" times");
if (_jvmFix2>0) long wait = _changes.size()==0?__IDLE_TICK:0L;
Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times"); if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
wait = to_next_timeout;
if (_jvmFix1>0) // If we should wait with a select
Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times"); if (wait>0)
else if(Log.isDebugEnabled() && _jvmFix0>0)
Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
_paused=0;
_jvmFix2=0;
_jvmFix1=0;
_jvmFix0=0;
_log=now+60000;
}
// If we see signature of possible JVM bug, increment count.
if (selected==0 && wait>10 && (now-before)<(wait/2))
{ {
// Increment bug count and try a work around long before=now;
_jvmBug++; selected=selector.select(wait);
if (_jvmBug>(__JVMBUG_THRESHHOLD)) _selects++;
{ now = System.currentTimeMillis();
try _timeout.setNow(now);
{ checkJvmBugs(before, now, wait, selected);
if (_jvmBug==__JVMBUG_THRESHHOLD+1)
_jvmFix2++;
Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
}
catch(InterruptedException e)
{
Log.ignore(e);
}
}
else if (_jvmBug==__JVMBUG_THRESHHOLD)
{
synchronized (this)
{
// BLOODY SUN BUG !!! Try refreshing the entire selector.
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
Selector old_selector=_selector;
_selector=new_selector;
try
{
old_selector.close();
}
catch(Exception e)
{
Log.ignore(e);
}
return;
}
}
else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
{
// Cancel keys with 0 interested ops
int cancelled=0;
for (SelectionKey k: selector.keys())
{
if (k.isValid()&&k.interestOps()==0)
{
k.cancel();
cancelled++;
}
}
if (cancelled>0)
_jvmFix0++;
return;
}
}
else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
{
// Look for busy key
SelectionKey busy = selector.selectedKeys().iterator().next();
if (busy==_busyKey)
{
if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
{
final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
Log.warn("Busy Key "+busy.channel()+" "+endpoint);
busy.cancel();
if (endpoint!=null)
{
dispatch(new Runnable()
{
public void run()
{
try
{
endpoint.close();
}
catch (IOException e)
{
Log.ignore(e);
}
}
});
}
}
}
else
_busyKeyCount=0;
_busyKey=busy;
} }
} }
else
{
selector.selectNow();
_selects++;
}
// have we been destroyed while sleeping // have we been destroyed while sleeping
if (_selector==null || !selector.isOpen()) if (_selector==null || !selector.isOpen())
return; return;
@ -651,63 +500,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
{ {
((SelectChannelEndPoint)att).schedule(); ((SelectChannelEndPoint)att).schedule();
} }
else if (key.isAcceptable())
{
SocketChannel channel = acceptChannel(key);
if (channel==null)
continue;
channel.configureBlocking(false);
// TODO make it reluctant to leave 0
_nextSet=++_nextSet%_selectSet.length;
// Is this for this selectset
if (_nextSet==_setID)
{
// bind connections to this select set.
SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
SelectChannelEndPoint endpoint=_selectSet[_nextSet].createEndPoint(channel,cKey);
cKey.attach(endpoint);
if (endpoint != null)
endpoint.schedule();
}
else
{
// nope - give it to another.
_selectSet[_nextSet].addChange(channel);
_selectSet[_nextSet].wakeup();
}
}
else if (key.isConnectable())
{
// Complete a connection of a registered channel
SocketChannel channel = (SocketChannel)key.channel();
boolean connected=false;
try
{
connected=channel.finishConnect();
}
catch(Exception e)
{
connectionFailed(channel,e,att);
}
finally
{
if (connected)
{
key.interestOps(SelectionKey.OP_READ);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
else
{
key.cancel();
}
}
}
else else
{ {
// Wrap readable registered channel in an endpoint // Wrap readable registered channel in an endpoint
@ -738,15 +530,13 @@ public abstract class SelectorManager extends AbstractLifeCycle
// Everything always handled // Everything always handled
selector.selectedKeys().clear(); selector.selectedKeys().clear();
now=System.currentTimeMillis();
_timeout.setNow(now); _timeout.setNow(now);
Task task = _timeout.expired(); Task task = _timeout.expired();
while (task!=null) while (task!=null)
{ {
if (task instanceof Runnable) if (task instanceof Runnable)
dispatch((Runnable)task); dispatch((Runnable)task);
else
task.expired();
task = _timeout.expired(); task = _timeout.expired();
} }
@ -780,7 +570,148 @@ public abstract class SelectorManager extends AbstractLifeCycle
_selecting=null; _selecting=null;
} }
} }
/* ------------------------------------------------------------ */
private void checkJvmBugs(long before, long now, long wait, int selected)
throws IOException
{
Selector selector = _selector;
if (selector==null)
return;
// Look for JVM bugs over a monitor period.
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
// http://bugs.sun.com/view_bug.do?bug_id=6693490
if (now>_monitorNext)
{
_selects=(int)(_selects*__MONITOR_PERIOD/(now-_monitorStart));
_pausing=_selects>__MAX_SELECTS;
if (_pausing)
_paused++;
_selects=0;
_jvmBug=0;
_monitorStart=now;
_monitorNext=now+__MONITOR_PERIOD;
}
if (now>_log)
{
if (_paused>0)
Log.debug(this+" Busy selector - injecting delay "+_paused+" times");
if (_jvmFix2>0)
Log.debug(this+" JVM BUG(s) - injecting delay"+_jvmFix2+" times");
if (_jvmFix1>0)
Log.debug(this+" JVM BUG(s) - recreating selector "+_jvmFix1+" times, cancelled keys "+_jvmFix0+" times");
else if(Log.isDebugEnabled() && _jvmFix0>0)
Log.debug(this+" JVM BUG(s) - cancelled keys "+_jvmFix0+" times");
_paused=0;
_jvmFix2=0;
_jvmFix1=0;
_jvmFix0=0;
_log=now+60000;
}
// If we see signature of possible JVM bug, increment count.
if (selected==0 && wait>10 && (now-before)<(wait/2))
{
// Increment bug count and try a work around
_jvmBug++;
if (_jvmBug>(__JVMBUG_THRESHHOLD))
{
try
{
if (_jvmBug==__JVMBUG_THRESHHOLD+1)
_jvmFix2++;
Thread.sleep(__BUSY_PAUSE); // pause to avoid busy loop
}
catch(InterruptedException e)
{
Log.ignore(e);
}
}
else if (_jvmBug==__JVMBUG_THRESHHOLD)
{
synchronized (this)
{
// BLOODY SUN BUG !!! Try refreshing the entire selector.
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
return;
}
}
else if (_jvmBug%32==31) // heuristic attempt to cancel key 31,63,95,... loops
{
// Cancel keys with 0 interested ops
int cancelled=0;
for (SelectionKey k: selector.keys())
{
if (k.isValid()&&k.interestOps()==0)
{
k.cancel();
cancelled++;
}
}
if (cancelled>0)
_jvmFix0++;
return;
}
}
else if (__BUSY_KEY>0 && selected==1 && _selects>__MAX_SELECTS)
{
// Look for busy key
SelectionKey busy = selector.selectedKeys().iterator().next();
if (busy==_busyKey)
{
if (++_busyKeyCount>__BUSY_KEY && !(busy.channel() instanceof ServerSocketChannel))
{
final SelectChannelEndPoint endpoint = (SelectChannelEndPoint)busy.attachment();
Log.warn("Busy Key "+busy.channel()+" "+endpoint);
busy.cancel();
if (endpoint!=null)
{
dispatch(new Runnable()
{
public void run()
{
try
{
endpoint.close();
}
catch (IOException e)
{
Log.ignore(e);
}
}
});
}
}
}
else
_busyKeyCount=0;
_busyKey=busy;
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SelectorManager getManager() public SelectorManager getManager()
{ {
@ -802,6 +733,8 @@ public abstract class SelectorManager extends AbstractLifeCycle
*/ */
public void scheduleTimeout(Timeout.Task task, long timeoutMs) public void scheduleTimeout(Timeout.Task task, long timeoutMs)
{ {
if (!(task instanceof Runnable))
throw new IllegalArgumentException("!Runnable");
_timeout.schedule(task, timeoutMs); _timeout.schedule(task, timeoutMs);
} }
@ -911,22 +844,16 @@ public abstract class SelectorManager extends AbstractLifeCycle
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private static class ChangeSelectableChannel private static class ChannelAndAttachment
{ {
final SelectableChannel _channel; final SelectableChannel _channel;
final Object _attachment; final Object _attachment;
public ChangeSelectableChannel(SelectableChannel channel, Object attachment) public ChannelAndAttachment(SelectableChannel channel, Object attachment)
{ {
super(); super();
_channel = channel; _channel = channel;
_attachment = attachment; _attachment = attachment;
} }
} }
/* ------------------------------------------------------------ */
private interface ChangeTask
{
public void run();
}
} }

View File

@ -90,60 +90,56 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
if (_debug) __log.debug(_session+" channel="+channel); if (_debug) __log.debug(_session+" channel="+channel);
} }
int _outCount;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private void needOutBuffer() private void needOutBuffer()
{ {
if (_outNIOBuffer==null) synchronized (this)
{ {
synchronized (this) _outCount++;
{ if (_outNIOBuffer==null)
if (_outNIOBuffer==null) _outNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize());
_outNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize());
}
}
}
/* ------------------------------------------------------------ */
private void needInBuffer()
{
if (_inNIOBuffer==null)
{
synchronized (this)
{
if(_inNIOBuffer==null)
_inNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize());
}
} }
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private void freeOutBuffer() private void freeOutBuffer()
{ {
if (_outNIOBuffer!=null && _outNIOBuffer.length()==0) synchronized (this)
{ {
synchronized (this) if (--_outCount<=0 && _outNIOBuffer!=null && _outNIOBuffer.length()==0)
{ {
if (_outNIOBuffer!=null && _outNIOBuffer.length()==0) _buffers.returnBuffer(_outNIOBuffer);
{ _outNIOBuffer=null;
_buffers.returnBuffer(_outNIOBuffer); _outCount=0;
_outNIOBuffer=null;
}
} }
} }
} }
int _inCount;
/* ------------------------------------------------------------ */
private void needInBuffer()
{
synchronized (this)
{
_inCount++;
if(_inNIOBuffer==null)
_inNIOBuffer=(NIOBuffer)_buffers.getBuffer(_session.getPacketBufferSize());
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private void freeInBuffer() private void freeInBuffer()
{ {
if (_inNIOBuffer!=null && _inNIOBuffer.length()==0) synchronized (this)
{ {
synchronized (this) if (--_inCount<=0 &&_inNIOBuffer!=null && _inNIOBuffer.length()==0)
{ {
if (_inNIOBuffer!=null && _inNIOBuffer.length()==0) _buffers.returnBuffer(_inNIOBuffer);
{ _inNIOBuffer=null;
_buffers.returnBuffer(_inNIOBuffer); _inCount=0;
_inNIOBuffer=null;
}
} }
} }
} }
@ -184,10 +180,10 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
long end=System.currentTimeMillis()+((SocketChannel)_channel).socket().getSoTimeout(); long end=System.currentTimeMillis()+((SocketChannel)_channel).socket().getSoTimeout();
try try
{ {
if (isBufferingOutput()) while (isOpen() && isBufferingOutput()&& System.currentTimeMillis()<end)
{ {
flush(); flush();
while (isOpen() && isBufferingOutput() && System.currentTimeMillis()<end) if (isBufferingOutput())
{ {
Thread.sleep(100); // TODO non blocking Thread.sleep(100); // TODO non blocking
flush(); flush();
@ -198,14 +194,11 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
loop: while (isOpen() && !(_engine.isInboundDone() && _engine.isOutboundDone()) && System.currentTimeMillis()<end) loop: while (isOpen() && !(_engine.isInboundDone() && _engine.isOutboundDone()) && System.currentTimeMillis()<end)
{ {
if (isBufferingOutput()) while (isOpen() && isBufferingOutput() && System.currentTimeMillis()<end)
{ {
flush(); flush();
while (isOpen() && isBufferingOutput() && System.currentTimeMillis()<end) if (isBufferingOutput())
{ Thread.sleep(100);
Thread.sleep(100); // TODO non blocking
flush();
}
} }
if (_debug) __log.debug(_session+" closing "+_engine.getHandshakeStatus()); if (_debug) __log.debug(_session+" closing "+_engine.getHandshakeStatus());
@ -252,6 +245,8 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
ByteBuffer out_buffer=_outNIOBuffer.getByteBuffer(); ByteBuffer out_buffer=_outNIOBuffer.getByteBuffer();
try try
{ {
if (_outNIOBuffer.length()>0)
flush();
_outNIOBuffer.compact(); _outNIOBuffer.compact();
int put=_outNIOBuffer.putIndex(); int put=_outNIOBuffer.putIndex();
out_buffer.position(put); out_buffer.position(put);
@ -271,9 +266,10 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
} }
} }
} }
catch (InterruptedException e) catch (Exception e)
{ {
Log.ignore(e); Log.debug(e);
super.close();
} }
} }

View File

@ -411,13 +411,6 @@ public class ProxyHandler extends HandlerWrapper
private class Manager extends SelectorManager private class Manager extends SelectorManager
{ {
@Override
protected SocketChannel acceptChannel(SelectionKey key) throws IOException
{
// This is a client-side selector manager
throw new IllegalStateException();
}
@Override @Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey selectionKey) throws IOException
{ {

View File

@ -67,19 +67,6 @@ public class SelectChannelConnector extends AbstractNIOConnector
private final SelectorManager _manager = new SelectorManager() private final SelectorManager _manager = new SelectorManager()
{ {
@Override
protected SocketChannel acceptChannel(SelectionKey key) throws IOException
{
// TODO handle max connections
SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
if (channel==null)
return null;
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
return channel;
}
@Override @Override
public boolean dispatch(Runnable task) public boolean dispatch(Runnable task)
{ {
@ -212,9 +199,6 @@ public class SelectChannelConnector extends AbstractNIOConnector
if (_localPort<=0) if (_localPort<=0)
throw new IOException("Server channel not bound"); throw new IOException("Server channel not bound");
// Set to non blocking mode
_acceptChannel.configureBlocking(false);
} }
} }
} }
@ -287,8 +271,32 @@ public class SelectChannelConnector extends AbstractNIOConnector
_manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
_manager.start(); _manager.start();
open(); open();
_manager.register(_acceptChannel);
super.doStart(); super.doStart();
// start a thread to accept new connections
_manager.dispatch(new Runnable()
{
public void run()
{
final ServerSocketChannel server=_acceptChannel;
while (isRunning() && _acceptChannel==server && server.isOpen())
{
try
{
SocketChannel channel = server.accept();
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
_manager.register(channel);
}
catch(IOException e)
{
Log.ignore(e);
}
}
}
});
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -33,7 +33,7 @@ public class BusySelectChannelServerTest extends HttpServerTestBase
@BeforeClass @BeforeClass
public static void init() throws Exception public static void init() throws Exception
{ {
startServer(new SelectChannelConnector() SelectChannelConnector connector=new SelectChannelConnector()
{ {
@Override @Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
@ -128,6 +128,8 @@ public class BusySelectChannelServerTest extends HttpServerTestBase
} }
}; };
} }
}); };
connector.setAcceptors(1);
startServer(connector);
} }
} }