404517 Close connection if request received after half close

This commit is contained in:
Greg Wilkins 2013-04-04 15:36:37 +11:00
parent 3cd6d90e9f
commit df6e18cc00
1 changed files with 84 additions and 42 deletions

View File

@ -63,11 +63,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
*/
private volatile AsyncConnection _connection;
/** true if a thread has been dispatched to handle this endpoint */
private boolean _dispatched = false;
/** true if a non IO dispatch (eg async resume) is outstanding */
private boolean _asyncDispatch = false;
private static final int STATE_NEEDS_DISPATCH=-1;
private static final int STATE_UNDISPATCHED=0;
private static final int STATE_DISPATCHED=1;
private static final int STATE_ASYNC=2;
private int _state;
private boolean _onIdle;
/** true if the last write operation succeed and wrote all offered bytes */
private volatile boolean _writable = true;
@ -83,6 +85,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private boolean _open;
private volatile long _idleTimestamp;
private volatile boolean _checkIdle;
private boolean _ishut;
@ -94,8 +97,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
_manager = selectSet.getManager();
_selectSet = selectSet;
_dispatched = false;
_asyncDispatch = false;
_state=STATE_UNDISPATCHED;
_onIdle=false;
_open=true;
_key = key;
@ -169,7 +172,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
// we are not interested in further selecting
_key.interestOps(0);
if (!_dispatched)
if (_state<STATE_DISPATCHED)
updateKey();
return;
}
@ -184,13 +187,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}
// If dispatched, then deregister interest
if (_dispatched)
if (_state>=STATE_DISPATCHED)
_key.interestOps(0);
else
{
// other wise do the dispatch
dispatch();
if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
{
_key.interestOps(0);
}
@ -203,10 +206,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
synchronized(this)
{
if (_dispatched)
_asyncDispatch=true;
else
dispatch();
switch(_state)
{
case STATE_NEEDS_DISPATCH:
case STATE_UNDISPATCHED:
dispatch();
break;
case STATE_DISPATCHED:
case STATE_ASYNC:
_state=STATE_ASYNC;
break;
}
}
}
@ -215,15 +226,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
synchronized(this)
{
if (!_dispatched)
if (_state<=STATE_UNDISPATCHED)
{
_dispatched = true;
boolean dispatched = _manager.dispatch(_handler);
if(!dispatched)
if (_onIdle)
_state = STATE_NEEDS_DISPATCH;
else
{
_dispatched = false;
LOG.warn("Dispatched Failed! "+this+" to "+_manager);
updateKey();
_state = STATE_DISPATCHED;
boolean dispatched = _manager.dispatch(_handler);
if(!dispatched)
{
_state = STATE_UNDISPATCHED;
LOG.warn("Dispatched Failed! "+this+" to "+_manager);
updateKey();
}
}
}
}
@ -240,15 +256,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
synchronized (this)
{
if (_asyncDispatch)
switch(_state)
{
_asyncDispatch=false;
return false;
case STATE_ASYNC:
_state=STATE_DISPATCHED;
return false;
default:
_state=STATE_UNDISPATCHED;
updateKey();
return true;
}
_dispatched = false;
updateKey();
}
return true;
}
/* ------------------------------------------------------------ */
@ -266,30 +285,33 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
public void setCheckForIdle(boolean check)
{
_idleTimestamp=check?System.currentTimeMillis():0;
if (check)
{
_idleTimestamp=System.currentTimeMillis();
_checkIdle=true;
}
else
_checkIdle=false;
}
/* ------------------------------------------------------------ */
public boolean isCheckForIdle()
{
return _idleTimestamp!=0;
return _checkIdle;
}
/* ------------------------------------------------------------ */
protected void notIdle()
{
if (_idleTimestamp!=0)
_idleTimestamp=System.currentTimeMillis();
_idleTimestamp=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
public void checkIdleTimestamp(long now)
{
long idleTimestamp=_idleTimestamp;
if (idleTimestamp!=0 && _maxIdleTime>0)
if (isCheckForIdle() && _maxIdleTime>0)
{
final long idleForMs=now-idleTimestamp;
final long idleForMs=now-_idleTimestamp;
if (idleForMs>_maxIdleTime)
{
@ -316,7 +338,27 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */
public void onIdleExpired(long idleForMs)
{
_connection.onIdleExpired(idleForMs);
System.err.println("ON IDLE EXPIRED");
try
{
synchronized (this)
{
_onIdle=true;
}
if (_maxIdleTime>0 && (System.currentTimeMillis()-_idleTimestamp)>_maxIdleTime)
_connection.onIdleExpired(idleForMs);
}
finally
{
synchronized (this)
{
_onIdle=false;
if (_state==STATE_NEEDS_DISPATCH)
dispatch();
}
}
}
/* ------------------------------------------------------------ */
@ -341,7 +383,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
synchronized (this)
{
_writable=false;
if (!_dispatched)
if (_state<STATE_DISPATCHED)
updateKey();
}
}
@ -367,7 +409,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
synchronized (this)
{
_writable=false;
if (!_dispatched)
if (_state<STATE_DISPATCHED)
updateKey();
}
}
@ -514,8 +556,8 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
int current_ops=-1;
if (getChannel().isOpen())
{
boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
_interestOps =
((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
@ -764,11 +806,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
keyString += "-";
}
return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
hashCode(),
_socket.getRemoteSocketAddress(),
_socket.getLocalSocketAddress(),
_dispatched,
_state,
isOpen(),
isInputShutdown(),
isOutputShutdown(),