364638 SCEP does idle timestamp checking. New setCheckForIdle method controls onIdleExpired callback.

364921 a second onIdleExpired callback will result in close rather than a shutdown output.
This commit is contained in:
Greg Wilkins 2011-11-28 11:51:01 +11:00
parent a9223fe208
commit ddce35a2e6
13 changed files with 140 additions and 198 deletions

View File

@ -315,8 +315,6 @@ public abstract class AbstractHttpConnection extends AbstractConnection implemen
@Override
public void headerComplete() throws IOException
{
if (_endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleIdle();
HttpExchange exchange = _exchange;
if (exchange!=null)
exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
@ -325,8 +323,6 @@ public abstract class AbstractHttpConnection extends AbstractConnection implemen
@Override
public void content(Buffer ref) throws IOException
{
if (_endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleIdle();
HttpExchange exchange = _exchange;
if (exchange!=null)
exchange.getEventListener().onResponseContent(ref);

View File

@ -331,21 +331,11 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
_endp.close();
}
public void scheduleIdle()
{
_endp.scheduleIdle();
}
public int fill(Buffer buffer) throws IOException
{
return _endp.fill(buffer);
}
public void cancelIdle()
{
_endp.cancelIdle();
}
public boolean isWritable()
{
return _endp.isWritable();
@ -446,9 +436,25 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
_endp.setMaxIdleTime(timeMs);
}
public void onIdleExpired()
{
_endp.onIdleExpired();
}
public void setCheckForIdle(boolean check)
{
_endp.setCheckForIdle(check);
}
public boolean isCheckForIdle()
{
return _endp.isCheckForIdle();
}
public String toString()
{
return "Upgradable:"+_endp.toString();
}
}
}

View File

@ -39,6 +39,10 @@ public abstract class AbstractConnection implements Connection
{
try
{
LOG.debug("onIdleExpired {} {}",this,_endp);
if (_endp.isInputShutdown() || _endp.isOutputShutdown())
_endp.close();
else
_endp.shutdownOutput();
}
catch(IOException e)
@ -52,7 +56,6 @@ public abstract class AbstractConnection implements Connection
catch(IOException e2)
{
LOG.ignore(e2);
}
}
}

View File

@ -32,14 +32,22 @@ public interface AsyncEndPoint extends ConnectedEndPoint
public void scheduleWrite();
/* ------------------------------------------------------------ */
/** Schedule a call to the idle timeout
/** Callback when idle.
* <p>An endpoint is idle if there has been no IO activity for
* {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true.
*/
public void scheduleIdle();
public void onIdleExpired();
/* ------------------------------------------------------------ */
/** Cancel a call to the idle timeout
/** Set if the endpoint should be checked for idleness
*/
public void cancelIdle();
public void setCheckForIdle(boolean check);
/* ------------------------------------------------------------ */
/** Get if the endpoint should be checked for idleness
*/
public boolean isCheckForIdle();
/* ------------------------------------------------------------ */
public boolean isWritable();

View File

@ -92,7 +92,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_open=true;
_key = key;
scheduleIdle();
setCheckForIdle(true);
}
/* ------------------------------------------------------------ */
@ -258,15 +258,22 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
/* ------------------------------------------------------------ */
public void scheduleIdle()
public void setCheckForIdle(boolean check)
{
_idleTimestamp=System.currentTimeMillis();
_idleTimestamp=check?System.currentTimeMillis():0;
}
/* ------------------------------------------------------------ */
public void cancelIdle()
public boolean isCheckForIdle()
{
_idleTimestamp=0;
return _idleTimestamp!=0;
}
/* ------------------------------------------------------------ */
protected void notIdle()
{
if (_idleTimestamp!=0)
_idleTimestamp=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
@ -274,15 +281,28 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
long idleTimestamp=_idleTimestamp;
if (!getChannel().isOpen() || idleTimestamp!=0 && _maxIdleTime>0 && now>(idleTimestamp+_maxIdleTime))
{
onIdleExpired();
_idleTimestamp=now;
}
}
/* ------------------------------------------------------------ */
protected void onIdleExpired()
public void onIdleExpired()
{
_connection.onIdleExpired();
}
/* ------------------------------------------------------------ */
@Override
public int fill(Buffer buffer) throws IOException
{
int fill=super.fill(buffer);
if (fill>0)
notIdle();
return fill;
}
/* ------------------------------------------------------------ */
@Override
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
@ -302,6 +322,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
else if (l>0)
{
_writable=true;
notIdle();
}
return l;
}
@ -327,6 +348,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
else if (l>0)
{
_writable=true;
notIdle();
}
return l;
@ -346,6 +368,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
long now=_selectSet.getNow();
long end=now+timeoutMs;
boolean check=isCheckForIdle();
setCheckForIdle(true);
try
{
_readBlocked=true;
@ -372,6 +396,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
finally
{
_readBlocked=false;
setCheckForIdle(check);
}
}
return true;
@ -391,6 +416,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
long now=_selectSet.getNow();
long end=now+timeoutMs;
boolean check=isCheckForIdle();
setCheckForIdle(true);
try
{
_writeBlocked=true;
@ -416,8 +443,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
finally
{
_writeBlocked=false;
if (_idleTimestamp!=-1)
scheduleIdle();
setCheckForIdle(check);
}
}
return true;
@ -528,7 +554,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
_key.cancel();
}
cancelIdle();
if (_open)
{
@ -557,7 +582,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
if (_key!=null && _key.isValid())
_key.cancel();
cancelIdle();
if (_open)
{
_open=false;

View File

@ -703,14 +703,19 @@ public class SslConnection extends AbstractConnection implements AsyncConnection
_aEndp.scheduleWrite();
}
public void scheduleIdle()
public void onIdleExpired()
{
_aEndp.scheduleIdle();
_aEndp.onIdleExpired();
}
public void cancelIdle()
public void setCheckForIdle(boolean check)
{
_aEndp.cancelIdle();
_aEndp.setCheckForIdle(check);
}
public boolean isCheckForIdle()
{
return _aEndp.isCheckForIdle();
}
public void scheduleTimeout(Task task, long timeoutMs)

View File

@ -24,10 +24,12 @@ import org.junit.Test;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SelectChannelEndPointTest
{
protected SelectChannelEndPoint _lastEndp;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected SelectorManager _manager = new SelectorManager()
@ -64,6 +66,7 @@ public class SelectChannelEndPointTest
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet,key,2000);
endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
_lastEndp=endp;
return endp;
}
};
@ -116,10 +119,7 @@ public class SelectChannelEndPointTest
progress=false;
_in.compact();
if (_in.space()>0 && _endp.fill(_in)>0)
{
progress=true;
((AsyncEndPoint)_endp).cancelIdle();
}
while (_blockAt>0 && _in.length()>0 && _in.length()<_blockAt)
{
@ -326,6 +326,54 @@ public class SelectChannelEndPointTest
}
}
@Test
public void testIdle() throws Exception
{
Socket client = newClient();
client.setSoTimeout(3000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.register(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes("UTF-8"));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b>0);
assertEquals(c,(char)b);
}
// Set Max idle
_lastEndp.setMaxIdleTime(500);
// read until idle shutdown received
long start=System.currentTimeMillis();
int b=client.getInputStream().read();
assertEquals(-1,b);
long idle=System.currentTimeMillis()-start;
assertTrue(idle>400);
assertTrue(idle<2000);
// But endpoint is still open.
assertTrue(_lastEndp.isOpen());
// Wait for another idle callback
Thread.sleep(1000);
// endpoint is closed.
assertFalse(_lastEndp.isOpen());
}
@Test
public void testStress() throws Exception
{

View File

@ -825,8 +825,6 @@ public abstract class AbstractHttpConnection extends AbstractConnection
@Override
public void headerComplete() throws IOException
{
if (_endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleIdle();
_requests++;
_generator.setVersion(_version);
switch (_version)
@ -902,8 +900,6 @@ public abstract class AbstractHttpConnection extends AbstractConnection
@Override
public void content(Buffer ref) throws IOException
{
if (_endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleIdle();
if (_delayedHandling)
{
_delayedHandling=false;

View File

@ -122,7 +122,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
public void customize(EndPoint endpoint, Request request) throws IOException
{
AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
aEndp.cancelIdle();
aEndp.setCheckForIdle(false);
request.setTimeStamp(System.currentTimeMillis());
endpoint.setMaxIdleTime(_maxIdleTime);
super.customize(endpoint, request);
@ -132,7 +132,8 @@ public class SelectChannelConnector extends AbstractNIOConnector
@Override
public void persist(EndPoint endpoint) throws IOException
{
((AsyncEndPoint)endpoint).scheduleIdle();
AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
aEndp.setCheckForIdle(true);
super.persist(endpoint);
}

View File

@ -43,7 +43,6 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
public final static byte LENGTH_FRAME=(byte)0x80;
public final static byte SENTINEL_FRAME=(byte)0x00;
final IdleCheck _idle;
final WebSocketParser _parser;
final WebSocketGenerator _generator;
final WebSocket _websocket;
@ -56,8 +55,6 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
throws IOException
{
super(endpoint,timestamp);
if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle();
_endp.setMaxIdleTime(maxIdleTime);
@ -66,28 +63,6 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
_generator = new WebSocketGeneratorD00(buffers, _endp);
_parser = new WebSocketParserD00(buffers, endpoint, new FrameHandlerD00(_websocket));
if (_endp instanceof SelectChannelEndPoint)
{
final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
scep.cancelIdle();
_idle=new IdleCheck()
{
public void access(EndPoint endp)
{
scep.scheduleIdle();
}
};
scep.scheduleIdle();
}
else
{
_idle = new IdleCheck()
{
public void access(EndPoint endp)
{}
};
}
}
/* ------------------------------------------------------------ */
@ -189,8 +164,6 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
{
if (_endp.isOpen())
{
_idle.access(_endp);
if (_endp.isInputShutdown() && _generator.isBufferEmpty())
_endp.close();
else
@ -252,7 +225,6 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
_generator.addFrame((byte)0,SENTINEL_FRAME,data,0,data.length);
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -261,7 +233,6 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
_generator.addFrame((byte)0,LENGTH_FRAME,data,offset,length);
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -284,7 +255,6 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
_generator.addFrame((byte)0,opcode,content,offset,length);
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -370,11 +340,6 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
}
}
private interface IdleCheck
{
void access(EndPoint endp);
}
public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
{
String uri=request.getRequestURI();

View File

@ -75,7 +75,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
private final static byte[] MAGIC;
private final IdleCheck _idle;
private final WebSocketParser _parser;
private final WebSocketGenerator _generator;
private final WebSocket _webSocket;
@ -115,9 +114,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
{
super(endpoint,timestamp);
if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle();
_endp.setMaxIdleTime(maxIdleTime);
_webSocket = websocket;
@ -129,28 +125,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
_protocol=protocol;
if (_endp instanceof SelectChannelEndPoint)
{
final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
scep.cancelIdle();
_idle=new IdleCheck()
{
public void access(EndPoint endp)
{
scep.scheduleIdle();
}
};
scep.scheduleIdle();
}
else
{
_idle = new IdleCheck()
{
public void access(EndPoint endp)
{}
};
}
_maxTextMessageSize=buffers.getBufferSize();
_maxBinaryMessageSize=-1;
}
@ -199,7 +173,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
{
if (_endp.isOpen())
{
_idle.access(_endp);
if (_closedIn && _closedOut && _generator.isBufferEmpty())
_endp.close();
else if (_endp.isInputShutdown() && !_closedIn)
@ -333,7 +306,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length);
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -344,7 +316,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length);
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -355,7 +326,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_generator.addFrame(flags,opcode,content,offset,length);
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -366,7 +336,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_generator.addFrame((byte)0x8,control,data,offset,length);
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -730,12 +699,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
}
}
/* ------------------------------------------------------------ */
private interface IdleCheck
{
void access(EndPoint endp);
}
/* ------------------------------------------------------------ */
public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
{

View File

@ -76,7 +76,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
}
private final static byte[] MAGIC;
private final IdleCheck _idle;
private final List<Extension> _extensions;
private final WebSocketParserD08 _parser;
private final WebSocketParser.FrameHandler _inbound;
@ -129,9 +128,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
_context=Thread.currentThread().getContextClassLoader();
if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle();
_draft=draft;
_endp.setMaxIdleTime(maxIdleTime);
@ -163,27 +159,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
_protocol=protocol;
if (_endp instanceof SelectChannelEndPoint)
{
final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
scep.cancelIdle();
_idle=new IdleCheck()
{
public void access(EndPoint endp)
{
scep.scheduleIdle();
}
};
scep.scheduleIdle();
}
else
{
_idle = new IdleCheck()
{
public void access(EndPoint endp)
{}
};
}
}
/* ------------------------------------------------------------ */
@ -245,7 +220,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
_generator.returnBuffer();
if (_endp.isOpen())
{
_idle.access(_endp);
if (_closedIn && _closedOut && _outbound.isBufferEmpty())
_endp.close();
else if (_endp.isInputShutdown() && !_closedIn)
@ -416,7 +390,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
byte[] data = content.getBytes(StringUtil.__UTF8);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_TEXT,data,0,data.length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -426,7 +399,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_BINARY,content,offset,length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -436,7 +408,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
_outbound.addFrame(flags,opcode,content,offset,length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -446,7 +417,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
_outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -829,12 +799,6 @@ public class WebSocketConnectionD08 extends AbstractConnection implements WebSoc
}
}
/* ------------------------------------------------------------ */
private interface IdleCheck
{
void access(EndPoint endp);
}
/* ------------------------------------------------------------ */
public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
{

View File

@ -105,7 +105,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
}
private final static byte[] MAGIC;
private final IdleCheck _idle;
private final List<Extension> _extensions;
private final WebSocketParserD13 _parser;
private final WebSocketGeneratorD13 _generator;
@ -155,9 +154,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
_context=Thread.currentThread().getContextClassLoader();
if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle();
_draft=draft;
_endp.setMaxIdleTime(maxIdleTime);
@ -190,28 +186,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
_protocol=protocol;
// TODO should these be AsyncEndPoint checks/calls?
if (_endp instanceof SelectChannelEndPoint)
{
final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
scep.cancelIdle();
_idle=new IdleCheck()
{
public void access(EndPoint endp)
{
scep.scheduleIdle();
}
};
scep.scheduleIdle();
}
else
{
_idle = new IdleCheck()
{
public void access(EndPoint endp)
{}
};
}
}
/* ------------------------------------------------------------ */
@ -273,7 +247,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
_generator.returnBuffer();
if (_endp.isOpen())
{
_idle.access(_endp);
if (_closedIn && _closedOut && _outbound.isBufferEmpty())
_endp.close();
else if (_endp.isInputShutdown() && !_closedIn)
@ -439,7 +412,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
byte[] data = content.getBytes(StringUtil.__UTF8);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_TEXT,data,0,data.length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -449,7 +421,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_BINARY,content,offset,length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -459,7 +430,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
_outbound.addFrame(flags,opcode,content,offset,length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -470,7 +440,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
_outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
@ -948,12 +917,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
}
}
/* ------------------------------------------------------------ */
private interface IdleCheck
{
void access(EndPoint endp);
}
/* ------------------------------------------------------------ */
public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
{