312243 Optimized timeout handling

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1764 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2010-05-12 12:20:49 +00:00
parent 79633ecf96
commit 91bfa30e6c
19 changed files with 273 additions and 218 deletions

View File

@ -16,6 +16,7 @@ jetty-7.1.0 5 May 2010
+ 306353 fixed cross context dispatch to root context.
+ 311154 Added deprecated StringBuffer API for backwards compatibility
+ 311554 Protect shutdown thread from Server#doStop
+ 312243 Optimized timeout handling
jetty-7.1.0.RC1 5 May 2010
+ 286889 Allow System and Server classes to be set on Server instance and when applied to all webapps

View File

@ -34,8 +34,6 @@ import org.eclipse.jetty.server.Server;
* Connection implementation of the Ajp13 protocol. <p/> XXX Refactor to remove
* duplication of HttpConnection
*
*
*
*/
public class Ajp13Connection extends HttpConnection
{

View File

@ -602,8 +602,6 @@ public class Ajp13Generator extends AbstractGenerator
total += len;
}
return total;
}
catch (IOException e)

View File

@ -33,12 +33,13 @@ public class Ajp13SocketConnector extends SocketConnector
static boolean __allowShutdown = false;
public Ajp13SocketConnector()
{
super.setHeaderBufferSize(Ajp13Packet.MAX_DATA_SIZE);
super.setRequestHeaderSize(Ajp13Packet.MAX_DATA_SIZE);
super.setResponseHeaderSize(Ajp13Packet.MAX_DATA_SIZE);
super.setRequestBufferSize(Ajp13Packet.MAX_DATA_SIZE);
super.setResponseBufferSize(Ajp13Packet.MAX_DATA_SIZE);
// IN AJP protocol the socket stay open, so
// by default the time out is set to 900 seconds
super.setMaxIdleTime(900000);
// by default the time out is set to 0 seconds
super.setMaxIdleTime(0);
}
@Override
@ -60,6 +61,9 @@ public class Ajp13SocketConnector extends SocketConnector
super.customize(endpoint,request);
if (request.isSecure())
request.setScheme(HttpSchemes.HTTPS);
System.err.println("Customize "+endpoint+" "+request);
}
/* ------------------------------------------------------------ */

View File

@ -14,10 +14,13 @@
package org.eclipse.jetty.ajp;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -26,7 +29,9 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -48,7 +53,6 @@ public class Ajp13ConnectionTest
_connector=new Ajp13SocketConnector();
_connector.setPort(0);
_connector.setMaxIdleTime(100);
_server.setConnectors(new Connector[] { _connector });
_server.setHandler(new Handler());
_server.start();
@ -65,6 +69,7 @@ public class Ajp13ConnectionTest
public void openSocket() throws Exception
{
_client=new Socket("localhost",_connector.getLocalPort());
_client.setSoTimeout(500);
}
@After
@ -292,29 +297,17 @@ public class Ajp13ConnectionTest
// TODO: char array instead of string?
private String readResponse(Socket _client) throws IOException
{
BufferedReader br=null;
ByteArrayOutputStream bout = new ByteArrayOutputStream();
try
{
br=new BufferedReader(new InputStreamReader(_client.getInputStream()));
StringBuffer sb=new StringBuffer();
String line;
while ((line=br.readLine()) != null)
{
sb.append(line);
sb.append('\n');
}
return sb.toString();
IO.copy(_client.getInputStream(),bout);
}
finally
catch(SocketTimeoutException e)
{
if (br != null)
{
br.close();
}
Log.ignore(e);
}
return bout.toString("utf-8");
}
public static class Handler extends AbstractHandler

View File

@ -119,35 +119,6 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
{
Log.info(""+_result);
}
/* ------------------------------------------------------------ */
/* (non-Javadoc)
* @see org.eclipse.io.nio.SelectChannelEndPoint#idleExpired()
*/
@Override
protected void idleExpired()
{
try
{
getSelectManager().dispatch(new Runnable()
{
public void run()
{
doIdleExpired();
}
});
}
catch(Exception e)
{
Log.ignore(e);
}
}
/* ------------------------------------------------------------ */
protected void doIdleExpired()
{
super.idleExpired();
}
/* ------------------------------------------------------------ */
@Override

View File

@ -24,13 +24,15 @@ import java.io.IOException;
*/
public class ByteArrayEndPoint implements ConnectedEndPoint
{
byte[] _inBytes;
ByteArrayBuffer _in;
ByteArrayBuffer _out;
boolean _closed;
boolean _nonBlocking;
boolean _growOutput;
Connection _connection;
protected byte[] _inBytes;
protected ByteArrayBuffer _in;
protected ByteArrayBuffer _out;
protected boolean _closed;
protected boolean _nonBlocking;
protected boolean _growOutput;
protected Connection _connection;
protected int _maxIdleTime;
/* ------------------------------------------------------------ */
/**
@ -353,5 +355,23 @@ public class ByteArrayEndPoint implements ConnectedEndPoint
_growOutput=growOutput;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.EndPoint#getMaxIdleTime()
*/
public int getMaxIdleTime()
{
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.EndPoint#setMaxIdleTime(int)
*/
public void setMaxIdleTime(int timeMs) throws IOException
{
_maxIdleTime=timeMs;
}
}

View File

@ -16,14 +16,12 @@ package org.eclipse.jetty.io;
import java.io.IOException;
/**
*
* A transport EndPoint
*/
public interface EndPoint
{
/**
* Close any backing stream associated with the buffer
*/
@ -149,4 +147,25 @@ public interface EndPoint
*/
public void flush() throws IOException;
/* ------------------------------------------------------------ */
/** Get the max idle time in ms.
* <p>The max idle time is the time the endpoint can be idle before
* extraordinary handling takes place. This loosely corresponds to
* the {@link java.net.Socket#getSoTimeout()} for blocking connections,
* but {@link AsyncEndPoint} implementations must use other mechanisms
* to implement the max idle time.
* @return the max idle time in ms.
*/
public int getMaxIdleTime();
/* ------------------------------------------------------------ */
/** Set the max idle time.
* @param timeMs the max idle time in MS.
* @throws IOException if the timeout cannot be set.
*/
public void setMaxIdleTime(int timeMs) throws IOException;
}

View File

@ -41,6 +41,18 @@ public class SocketEndPoint extends StreamEndPoint
{
super(socket.getInputStream(),socket.getOutputStream());
_socket=socket;
super.setMaxIdleTime(_socket.getSoTimeout());
}
/**
*
*/
protected SocketEndPoint(Socket socket, int maxIdleTime)
throws IOException
{
super(socket.getInputStream(),socket.getOutputStream());
_socket=socket;
super.setMaxIdleTime(maxIdleTime);
}
/* (non-Javadoc)
@ -177,4 +189,18 @@ public class SocketEndPoint extends StreamEndPoint
{
return _socket;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int)
*/
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
if (timeMs!=getMaxIdleTime())
_socket.setSoTimeout(timeMs>0?timeMs:0);
super.setMaxIdleTime(timeMs);
}
}

View File

@ -31,6 +31,7 @@ public class StreamEndPoint implements EndPoint
{
InputStream _in;
OutputStream _out;
int _maxIdleTime;
/**
*
@ -280,5 +281,17 @@ public class StreamEndPoint implements EndPoint
{
return false;
}
/* ------------------------------------------------------------ */
public int getMaxIdleTime()
{
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
public void setMaxIdleTime(int timeMs) throws IOException
{
_maxIdleTime=timeMs;
}
}

View File

@ -41,15 +41,30 @@ public class ChannelEndPoint implements EndPoint
protected final Socket _socket;
protected InetSocketAddress _local;
protected InetSocketAddress _remote;
protected int _maxIdleTime;
/**
*
*/
public ChannelEndPoint(ByteChannel channel)
public ChannelEndPoint(ByteChannel channel) throws IOException
{
super();
this._channel = channel;
_socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
if (_socket!=null)
_maxIdleTime=_socket.getSoTimeout();
}
/**
*
*/
protected ChannelEndPoint(ByteChannel channel, int maxIdleTime) throws IOException
{
this._channel = channel;
_maxIdleTime=maxIdleTime;
_socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null;
if (_socket!=null)
_socket.setSoTimeout(_maxIdleTime);
}
public boolean isBlocking()
@ -442,4 +457,21 @@ public class ChannelEndPoint implements EndPoint
{
return false;
}
/* ------------------------------------------------------------ */
public int getMaxIdleTime()
{
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int)
*/
public void setMaxIdleTime(int timeMs) throws IOException
{
if (_socket!=null && timeMs!=_maxIdleTime)
_socket.setSoTimeout(timeMs>0?timeMs:0);
_maxIdleTime=timeMs;
}
}

View File

@ -46,10 +46,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
private boolean _readBlocked;
private boolean _writeBlocked;
private boolean _open;
private final Timeout.Task _idleTask = new IdleTask();
private volatile long _idleTimestamp;
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
throws IOException
{
super(channel);
@ -61,7 +62,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
_key = key;
_connection = _manager.newConnection(channel,this);
_manager.endPointOpened(this);
scheduleIdle();
}
@ -198,13 +198,23 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
/* ------------------------------------------------------------ */
public void scheduleIdle()
{
_selectSet.scheduleIdle(_idleTask);
_idleTimestamp=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
public void cancelIdle()
{
_selectSet.cancelIdle(_idleTask);
_idleTimestamp=0;
}
/* ------------------------------------------------------------ */
public void checkIdleTimestamp(long now)
{
if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime))
{
System.err.println("EXPIRED "+now+">("+_idleTimestamp+"+"+_maxIdleTime+")");
idleExpired();
}
}
/* ------------------------------------------------------------ */
@ -336,7 +346,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
finally
{
_writeBlocked=false;
if (_idleTask.isScheduled())
if (_idleTimestamp!=-1)
scheduleIdle();
}
}
@ -426,7 +436,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
cancelIdle();
if (_open)
_manager.endPointClosed(this);
{
_selectSet.destroyEndPoint(this);
}
_open=false;
_key = null;
}
@ -452,7 +464,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
cancelIdle();
if (_open)
_manager.endPointClosed(this);
{
_selectSet.destroyEndPoint(this);
}
_open=false;
_key = null;
}
@ -554,12 +568,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
}
}
/* ------------------------------------------------------------ */
public Timeout.Task getTimeoutTask()
{
return _idleTask;
}
/* ------------------------------------------------------------ */
public SelectSet getSelectSet()
{
@ -567,24 +575,16 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class IdleTask extends Timeout.Task
/**
* Don't set the SoTimeout
* @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
*/
@Override
public void setMaxIdleTime(int timeMs) throws IOException
{
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.thread.Timeout.Task#expire()
*/
@Override
public void expired()
{
idleExpired();
}
@Override
public String toString()
{
return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
}
_maxIdleTime=timeMs;
}
}

View File

@ -21,7 +21,11 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
@ -49,9 +53,9 @@ public abstract class SelectorManager extends AbstractLifeCycle
private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue();
private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue();
private long _maxIdleTime;
private int _maxIdleTime;
private int _lowResourcesMaxIdleTime;
private long _lowResourcesConnections;
private long _lowResourcesMaxIdleTime;
private transient SelectSet[] _selectSet;
private int _selectSets=1;
private volatile int _set;
@ -63,7 +67,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
*/
public void setMaxIdleTime(long maxIdleTime)
{
_maxIdleTime=maxIdleTime;
_maxIdleTime=(int)maxIdleTime;
}
/* ------------------------------------------------------------ */
@ -167,7 +171,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
*/
public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
{
_lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
_lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
}
/* ------------------------------------------------------------ */
@ -296,13 +300,13 @@ public abstract class SelectorManager extends AbstractLifeCycle
public class SelectSet
{
private final int _setID;
private final Timeout _idleTimeout;
private final Timeout _timeout;
private final List<Object>[] _changes;
private int _change;
private int _nextSet;
private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
private Selector _selector;
private int _nextSet;
private volatile Thread _selecting;
private int _jvmBug;
private int _selects;
@ -316,21 +320,20 @@ public abstract class SelectorManager extends AbstractLifeCycle
private int _jvmFix0;
private int _jvmFix1;
private int _jvmFix2;
private volatile long _idleTick;
private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
/* ------------------------------------------------------------ */
SelectSet(int acceptorID) throws Exception
{
_setID=acceptorID;
_idleTimeout = new Timeout(this);
_idleTimeout.setDuration(getMaxIdleTime());
_idleTick = System.currentTimeMillis();
_timeout = new Timeout(this);
_timeout.setDuration(0L);
_changes = new List[] {new ArrayList(),new ArrayList()};
// create a selector;
_selector = Selector.open();
_change=0;
_monitorStart=System.currentTimeMillis();
_monitorNext=_monitorStart+__MONITOR_PERIOD;
_log=_monitorStart+60000;
@ -339,10 +342,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
/* ------------------------------------------------------------ */
public void addChange(Object point)
{
synchronized (_changes)
{
_changes[_change].add(point);
}
_changes.add(point);
}
/* ------------------------------------------------------------ */
@ -356,12 +356,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
addChange(new ChangeSelectableChannel(channel,att));
}
/* ------------------------------------------------------------ */
public void cancelIdle(Timeout.Task task)
{
task.cancel();
}
/* ------------------------------------------------------------ */
/**
* Select and dispatch tasks found from changes and the selector.
@ -373,44 +367,36 @@ public abstract class SelectorManager extends AbstractLifeCycle
try
{
_selecting=Thread.currentThread();
List<?> changes;
final Selector selector;
synchronized (_changes)
{
changes=_changes[_change];
_change=_change==0?1:0;
selector=_selector;
}
final Selector selector=_selector;
// Make any key changes required
final int size=changes.size();
for (int i = 0; i < size; i++)
Object change;
int changes=_changes.size();
while (changes-->0 && (change=_changes.poll())!=null)
{
try
{
Object o = changes.get(i);
if (o instanceof EndPoint)
if (change instanceof EndPoint)
{
// Update the operations for a key.
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
endpoint.doUpdateKey();
}
else if (o instanceof Runnable)
else if (change instanceof Runnable)
{
dispatch((Runnable)o);
dispatch((Runnable)change);
}
else if (o instanceof ChangeSelectableChannel)
else if (change instanceof ChangeSelectableChannel)
{
// finish accepting/connecting this connection
final ChangeSelectableChannel asc = (ChangeSelectableChannel)o;
final ChangeSelectableChannel asc = (ChangeSelectableChannel)change;
final SelectableChannel channel=asc._channel;
final Object att = asc._attachment;
if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
{
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key);
SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
key.attach(endpoint);
endpoint.schedule();
}
@ -419,14 +405,14 @@ public abstract class SelectorManager extends AbstractLifeCycle
channel.register(selector,SelectionKey.OP_CONNECT,att);
}
}
else if (o instanceof SocketChannel)
else if (change instanceof SocketChannel)
{
final SocketChannel channel=(SocketChannel)o;
final SocketChannel channel=(SocketChannel)change;
if (channel.isConnected())
{
SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
@ -435,17 +421,17 @@ public abstract class SelectorManager extends AbstractLifeCycle
channel.register(selector,SelectionKey.OP_CONNECT,null);
}
}
else if (o instanceof ServerSocketChannel)
else if (change instanceof ServerSocketChannel)
{
ServerSocketChannel channel = (ServerSocketChannel)o;
ServerSocketChannel channel = (ServerSocketChannel)change;
channel.register(getSelector(),SelectionKey.OP_ACCEPT);
}
else if (o instanceof ChangeTask)
else if (change instanceof ChangeTask)
{
((ChangeTask)o).run();
((ChangeTask)change).run();
}
else
throw new IllegalArgumentException(o.toString());
throw new IllegalArgumentException(change.toString());
}
catch (Exception e)
{
@ -455,28 +441,15 @@ public abstract class SelectorManager extends AbstractLifeCycle
Log.debug(e);
}
}
changes.clear();
long idle_next;
long retry_next;
long now=System.currentTimeMillis();
synchronized (this)
{
_idleTimeout.setNow(now);
_timeout.setNow(now);
if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
_idleTimeout.setDuration(_lowResourcesMaxIdleTime);
else
_idleTimeout.setDuration(_maxIdleTime);
idle_next=_idleTimeout.getTimeToNext();
retry_next=_timeout.getTimeToNext();
}
_timeout.setNow(now);
retry_next=_timeout.getTimeToNext();
// workout how low to wait in select
long wait = 1000L; // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
if (idle_next >= 0 && wait > idle_next)
wait = idle_next;
long wait = 1000L;
if (wait > 0 && retry_next >= 0 && wait > retry_next)
wait = retry_next;
@ -499,7 +472,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
long before=now;
int selected=selector.select(wait);
now = System.currentTimeMillis();
_idleTimeout.setNow(now);
_timeout.setNow(now);
_selects++;
@ -680,7 +652,8 @@ public abstract class SelectorManager extends AbstractLifeCycle
{
// bind connections to this select set.
SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
SelectChannelEndPoint endpoint=_selectSet[_nextSet].createEndPoint(channel,cKey);
cKey.attach(endpoint);
if (endpoint != null)
endpoint.schedule();
@ -710,7 +683,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
if (connected)
{
key.interestOps(SelectionKey.OP_READ);
SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
endpoint.schedule();
}
@ -724,7 +697,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
{
// Wrap readable registered channel in an endpoint
SocketChannel channel = (SocketChannel)key.channel();
SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint);
if (key.isReadable())
endpoint.schedule();
@ -750,9 +723,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
// Everything always handled
selector.selectedKeys().clear();
// tick over the timers
_idleTimeout.tick(now);
_timeout.setNow(now);
Task task = _timeout.expired();
while (task!=null)
@ -764,6 +734,27 @@ public abstract class SelectorManager extends AbstractLifeCycle
task = _timeout.expired();
}
// Idle tick
if (now-_idleTick>1000)
{
_idleTick=now;
final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
:now;
dispatch(new Runnable()
{
public void run()
{
for (SelectChannelEndPoint endp:_endPoints.keySet())
{
endp.checkIdleTimestamp(idle_now);
}
}
});
}
}
catch (CancelledKeyException e)
{
@ -784,15 +775,7 @@ public abstract class SelectorManager extends AbstractLifeCycle
/* ------------------------------------------------------------ */
public long getNow()
{
return _idleTimeout.getNow();
}
/* ------------------------------------------------------------ */
public void scheduleIdle(Timeout.Task task)
{
if (_idleTimeout.getDuration() <= 0)
return;
_idleTimeout.schedule(task);
return _timeout.getNow();
}
/* ------------------------------------------------------------ */
@ -820,6 +803,22 @@ public abstract class SelectorManager extends AbstractLifeCycle
if (selector!=null)
selector.wakeup();
}
/* ------------------------------------------------------------ */
private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
{
SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
endPointOpened(endp);
_endPoints.put(endp,this);
return endp;
}
/* ------------------------------------------------------------ */
public void destroyEndPoint(SelectChannelEndPoint endp)
{
_endPoints.remove(endp);
endPointClosed(endp);
}
/* ------------------------------------------------------------ */
Selector getSelector()
@ -865,7 +864,6 @@ public abstract class SelectorManager extends AbstractLifeCycle
selecting=_selecting!=null;
}
_idleTimeout.cancelAll();
_timeout.cancelAll();
try
{

View File

@ -490,8 +490,6 @@ public abstract class AbstractConnector extends HttpBuffers implements Connector
try
{
socket.setTcpNoDelay(true);
if (_maxIdleTime >= 0)
socket.setSoTimeout(_maxIdleTime);
if (_soLingerTime >= 0)
socket.setSoLinger(true,_soLingerTime / 1000);
else

View File

@ -127,11 +127,7 @@ public class SocketConnector extends AbstractConnector
{
ConnectorEndPoint connection = (ConnectorEndPoint)endpoint;
int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime;
if (connection._sotimeout!=lrmit)
{
connection._sotimeout=lrmit;
((Socket)endpoint.getTransport()).setSoTimeout(lrmit);
}
connection.setMaxIdleTime(lrmit);
super.customize(endpoint, request);
}
@ -177,14 +173,12 @@ public class SocketConnector extends AbstractConnector
{
boolean _dispatched=false;
volatile Connection _connection;
int _sotimeout;
protected final Socket _socket;
public ConnectorEndPoint(Socket socket) throws IOException
{
super(socket);
super(socket,_maxIdleTime);
_connection = newConnection(this);
_sotimeout=socket.getSoTimeout();
_socket=socket;
}
@ -241,14 +235,7 @@ public class SocketConnector extends AbstractConnector
if (_connection.isIdle())
{
if (isLowResources())
{
int lrmit = getLowResourcesMaxIdleTime();
if (lrmit>=0 && _sotimeout!= lrmit)
{
_sotimeout=lrmit;
_socket.setSoTimeout(_sotimeout);
}
}
setMaxIdleTime(getLowResourcesMaxIdleTime());
}
_connection=_connection.handle();

View File

@ -100,14 +100,8 @@ public class BlockingChannelConnector extends AbstractNIOConnector
public void customize(EndPoint endpoint, Request request)
throws IOException
{
ConnectorEndPoint connection = (ConnectorEndPoint)endpoint;
if (connection._sotimeout!=_maxIdleTime)
{
connection._sotimeout=_maxIdleTime;
((SocketChannel)endpoint.getTransport()).socket().setSoTimeout(_maxIdleTime);
}
super.customize(endpoint, request);
endpoint.setMaxIdleTime(_maxIdleTime);
configure(((SocketChannel)endpoint.getTransport()).socket());
}
@ -130,12 +124,12 @@ public class BlockingChannelConnector extends AbstractNIOConnector
int _sotimeout;
ConnectorEndPoint(ByteChannel channel)
throws IOException
{
super(channel);
super(channel,BlockingChannelConnector.this._maxIdleTime);
_connection = new HttpConnection(BlockingChannelConnector.this,this,getServer());
}
/* ------------------------------------------------------------ */
/** Get the connection.
* @return the connection

View File

@ -403,7 +403,7 @@ public class StressTest extends TestCase
if (_stress)
{
System.err.println("STRESS!");
doThreads(200,400,true);
doThreads(200,100,true);
}
else
doThreads(20,40,true);

View File

@ -8,6 +8,7 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.Timeout;
public class WebSocketConnection implements Connection, WebSocket.Outbound
{
@ -17,24 +18,26 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
final WebSocketGenerator _generator;
final long _timestamp;
final WebSocket _websocket;
final int _maxIdleTimeMs;
public WebSocketConnection(WebSocket websocket, EndPoint endpoint)
throws IOException
{
this(websocket,endpoint,new WebSocketBuffers(8192),System.currentTimeMillis(),300000);
}
public WebSocketConnection(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, long maxIdleTime)
public WebSocketConnection(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime)
throws IOException
{
// TODO - can we use the endpoint idle mechanism?
if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle();
_endp = endpoint;
_endp.setMaxIdleTime(maxIdleTime);
_timestamp = timestamp;
_websocket = websocket;
_generator = new WebSocketGenerator(buffers, _endp);
_maxIdleTimeMs=(int)maxIdleTime;
_parser = new WebSocketParser(buffers, endpoint, new WebSocketParser.EventHandler()
{
public void onFrame(byte frame, String data)
@ -81,10 +84,10 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{
public void access(EndPoint endp)
{
scep.getSelectSet().scheduleTimeout(scep.getTimeoutTask(),_maxIdleTimeMs);
scep.scheduleIdle();
}
};
scep.getSelectSet().scheduleTimeout(scep.getTimeoutTask(),_maxIdleTimeMs);
scep.scheduleIdle();
}
else
{
@ -162,7 +165,7 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
public void sendMessage(byte frame, String content) throws IOException
{
_generator.addFrame(frame,content,_maxIdleTimeMs);
_generator.addFrame(frame,content,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
@ -175,7 +178,7 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
public void sendMessage(byte frame, byte[] content, int offset, int length) throws IOException
{
_generator.addFrame(frame,content,offset,length,_maxIdleTimeMs);
_generator.addFrame(frame,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
@ -185,7 +188,7 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{
try
{
_generator.flush(_maxIdleTimeMs);
_generator.flush(_endp.getMaxIdleTime());
_endp.close();
}
catch(IOException e)

View File

@ -16,7 +16,7 @@ import org.eclipse.jetty.server.HttpConnection;
public class WebSocketFactory
{
private WebSocketBuffers _buffers;
private long _maxIdleTime=300000;
private int _maxIdleTime=300000;
/* ------------------------------------------------------------ */
public WebSocketFactory()
@ -43,7 +43,7 @@ public class WebSocketFactory
/** Set the maxIdleTime.
* @param maxIdleTime the maxIdleTime to set
*/
public void setMaxIdleTime(long maxIdleTime)
public void setMaxIdleTime(int maxIdleTime)
{
_maxIdleTime = maxIdleTime;
}