Improved failsafe close handling for half closed endpoints

This commit is contained in:
Greg Wilkins 2014-04-24 10:01:46 +02:00
parent aa394123cb
commit 61b2e7f75e
4 changed files with 40 additions and 16 deletions

View File

@ -44,6 +44,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
return AbstractEndPoint.this.needsFill(); return AbstractEndPoint.this.needsFill();
} }
}; };
private final WriteFlusher _writeFlusher = new WriteFlusher(this) private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{ {
@Override @Override
@ -142,9 +143,22 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
@Override @Override
protected void onIdleExpired(TimeoutException timeout) protected void onIdleExpired(TimeoutException timeout)
{ {
// Note: Rely on fillInterest to notify onReadTimeout to close connection. boolean output_shutdown=isOutputShutdown();
_fillInterest.onFail(timeout); boolean input_shutdown=isInputShutdown();
_writeFlusher.onFail(timeout); boolean fillFailed = _fillInterest.onFail(timeout);
boolean writeFailed = _writeFlusher.onFail(timeout);
// If the endpoint is half closed and there was no onFail handling, the close here
// This handles the situation where the connection has completed its close handling
// and the endpoint is half closed, but the other party does not complete the close.
// This perhaps should not check for half closed, however the servlet spec case allows
// for a dispatched servlet or suspended request to extend beyond the connections idle
// time. So if this test would always close an idle endpoint that is not handled, then
// we would need a mode to ignore timeouts for some HTTP states
if (isOpen() && (output_shutdown || input_shutdown) && !(fillFailed || writeFailed))
close();
else
LOG.debug("Ignored idle endpoint {}",this);
} }
@Override @Override

View File

@ -93,12 +93,17 @@ public abstract class FillInterest
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Call to signal a failure to a registered interest /** Call to signal a failure to a registered interest
* @return true if the cause was passed to a {@link Callback} instance
*/ */
public void onFail(Throwable cause) public boolean onFail(Throwable cause)
{ {
Callback callback=_interested.get(); Callback callback=_interested.get();
if (callback!=null && _interested.compareAndSet(callback,null)) if (callback!=null && _interested.compareAndSet(callback,null))
{
callback.failed(cause); callback.failed(cause);
return true;
}
return false;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -253,10 +253,14 @@ abstract public class WriteFlusher
return _buffers; return _buffers;
} }
protected void fail(Throwable cause) protected boolean fail(Throwable cause)
{ {
if (_callback!=null) if (_callback!=null)
{
_callback.failed(cause); _callback.failed(cause);
return true;
}
return false;
} }
protected void complete() protected void complete()
@ -430,7 +434,12 @@ abstract public class WriteFlusher
} }
} }
public void onFail(Throwable cause) /* ------------------------------------------------------------ */
/** Notify the flusher of a failure
* @param cause The cause of the failure
* @return true if the flusher passed the failure to a {@link Callback} instance
*/
public boolean onFail(Throwable cause)
{ {
// Keep trying to handle the failure until we get to IDLE or FAILED state // Keep trying to handle the failure until we get to IDLE or FAILED state
while(true) while(true)
@ -442,7 +451,7 @@ abstract public class WriteFlusher
case FAILED: case FAILED:
if (DEBUG) if (DEBUG)
LOG.debug("ignored: {} {}", this, cause); LOG.debug("ignored: {} {}", this, cause);
return; return false;
case PENDING: case PENDING:
if (DEBUG) if (DEBUG)
@ -450,10 +459,7 @@ abstract public class WriteFlusher
PendingState pending = (PendingState)current; PendingState pending = (PendingState)current;
if (updateState(pending,__IDLE)) if (updateState(pending,__IDLE))
{ return pending.fail(cause);
pending.fail(cause);
return;
}
break; break;
default: default:
@ -461,7 +467,7 @@ abstract public class WriteFlusher
LOG.debug("failed: {} {}", this, cause); LOG.debug("failed: {} {}", this, cause);
if (updateState(current,new FailedState(cause))) if (updateState(current,new FailedState(cause)))
return; return false;
break; break;
} }
} }

View File

@ -19,8 +19,6 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -37,8 +35,6 @@ import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After; import org.junit.After;
@ -132,6 +128,7 @@ public class ByteArrayEndPointTest
assertEquals(true,endp.flush(BufferUtil.EMPTY_BUFFER,BufferUtil.toBuffer(" and"),BufferUtil.toBuffer(" more"))); assertEquals(true,endp.flush(BufferUtil.EMPTY_BUFFER,BufferUtil.toBuffer(" and"),BufferUtil.toBuffer(" more")));
assertEquals("some output some more and more",endp.getOutputString()); assertEquals("some output some more and more",endp.getOutputString());
endp.close();
} }
@Test @Test
@ -150,6 +147,7 @@ public class ByteArrayEndPointTest
assertEquals(true,endp.flush(data)); assertEquals(true,endp.flush(data));
assertEquals("data.",BufferUtil.toString(endp.takeOutput())); assertEquals("data.",BufferUtil.toString(endp.takeOutput()));
endp.close();
} }
@ -237,6 +235,7 @@ public class ByteArrayEndPointTest
assertTrue(fcb.isDone()); assertTrue(fcb.isDone());
assertEquals(null, fcb.get()); assertEquals(null, fcb.get());
assertEquals(" more.", endp.getOutputString()); assertEquals(" more.", endp.getOutputString());
endp.close();
} }
/** /**