435322 Fixed Iterating Callback close

This commit is contained in:
Greg Wilkins 2014-06-26 23:50:07 +02:00
parent a68bf9139b
commit 9b764ef3fa
3 changed files with 104 additions and 90 deletions

View File

@ -820,7 +820,6 @@ public class SslConnection extends AbstractConnection
} }
catch (Exception e) catch (Exception e)
{ {
getEndPoint().close();
throw e; throw e;
} }
finally finally

View File

@ -426,11 +426,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override @Override
public void onClose() public void onClose()
{ {
if (_sendCallback.isInUse()) _sendCallback.close();
{
LOG.warn("Closed with pending write:"+this);
_sendCallback.failed(new EofException("Connection closed"));
}
super.onClose(); super.onClose();
} }
@ -580,6 +576,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_header = null; _header = null;
_shutdownOut = false; _shutdownOut = false;
} }
else if (isClosed())
{
callback.failed(new EofException());
}
else else
{ {
callback.failed(new WritePendingException()); callback.failed(new WritePendingException());

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.util; package org.eclipse.jetty.util;
import java.io.EOFException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -50,6 +51,46 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public abstract class IteratingCallback implements Callback public abstract class IteratingCallback implements Callback
{ {
/**
* The internal states of this callback
*/
private enum State
{
/**
* This callback is inactive, ready to iterate.
*/
INACTIVE,
/**
* This callback is iterating and {@link #process()} has scheduled an
* asynchronous operation by returning {@link Action#SCHEDULED}, but
* the operation is still undergoing.
*/
ACTIVE,
/**
* This callback is iterating and {@link #process()} has been called
* but not returned yet.
*/
ITERATING,
/**
* While this callback was iterating, another request for iteration
* has been issued, so the iteration must continue even if a previous
* call to {@link #process()} returned {@link Action#IDLE}.
*/
ITERATE_AGAIN,
/**
* The overall job has succeeded.
*/
SUCCEEDED,
/**
* The overall job has failed.
*/
FAILED,
/**
* The ICB has been closed and cannot be reset
*/
CLOSED
}
/** /**
* The indication of the overall progress of the overall job that * The indication of the overall progress of the overall job that
* implementations of {@link #process()} must return. * implementations of {@link #process()} must return.
@ -211,8 +252,25 @@ public abstract class IteratingCallback implements Callback
case SUCCEEDED: case SUCCEEDED:
{ {
// The overall job has completed. // The overall job has completed.
completeSuccess(); while (true)
{
State current = _state.get();
switch(current)
{
case SUCCEEDED:
case FAILED:
// Already complete!.
return true; return true;
case CLOSED:
throw new IllegalStateException();
default:
if (_state.compareAndSet(current, State.SUCCEEDED))
{
onCompleteSuccess();
return true;
}
}
}
} }
default: default:
{ {
@ -260,6 +318,10 @@ public abstract class IteratingCallback implements Callback
iterate(); iterate();
return; return;
} }
case CLOSED:
// too late!
return;
default: default:
{ {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
@ -275,11 +337,6 @@ public abstract class IteratingCallback implements Callback
*/ */
@Override @Override
public final void failed(Throwable x) public final void failed(Throwable x)
{
completeFailure(x);
}
protected void completeSuccess()
{ {
while (true) while (true)
{ {
@ -288,29 +345,11 @@ public abstract class IteratingCallback implements Callback
{ {
case SUCCEEDED: case SUCCEEDED:
case FAILED: case FAILED:
case INACTIVE:
case CLOSED:
// Already complete!. // Already complete!.
return; return;
default:
if (_state.compareAndSet(current, State.SUCCEEDED))
{
onCompleteSuccess();
return;
}
}
}
}
protected void completeFailure(Throwable x)
{
while (true)
{
State current = _state.get();
switch(current)
{
case SUCCEEDED:
case FAILED:
// Already complete!.
return;
default: default:
if (_state.compareAndSet(current, State.FAILED)) if (_state.compareAndSet(current, State.FAILED))
{ {
@ -321,29 +360,41 @@ public abstract class IteratingCallback implements Callback
} }
} }
/** public final void close()
{
while (true)
{
State current = _state.get();
switch(current)
{
case INACTIVE:
case SUCCEEDED:
case FAILED:
if (_state.compareAndSet(current, State.CLOSED))
return;
break;
default:
if (_state.compareAndSet(current, State.CLOSED))
{
onCompleteFailure(new IllegalStateException("Closed with pending callback "+current));
return;
}
}
}
}
/*
* only for testing
* @return whether this callback is idle and {@link #iterate()} needs to be called * @return whether this callback is idle and {@link #iterate()} needs to be called
*/ */
public boolean isIdle() boolean isIdle()
{ {
return _state.get() == State.INACTIVE; return _state.get() == State.INACTIVE;
} }
/* ------------------------------------------------------------ */ public boolean isClosed()
/**
* @return true if the callback is not INACTIVE, FAILED or SUCCEEDED.
*/
public boolean isInUse()
{ {
switch(_state.get()) return _state.get() == State.CLOSED;
{
case INACTIVE:
case FAILED:
case SUCCEEDED:
return false;
default:
return true;
}
} }
/** /**
@ -397,40 +448,4 @@ public abstract class IteratingCallback implements Callback
{ {
return String.format("%s[%s]", super.toString(), _state); return String.format("%s[%s]", super.toString(), _state);
} }
/**
* The internal states of this callback
*/
private enum State
{
/**
* This callback is inactive, ready to iterate.
*/
INACTIVE,
/**
* This callback is iterating and {@link #process()} has scheduled an
* asynchronous operation by returning {@link Action#SCHEDULED}, but
* the operation is still undergoing.
*/
ACTIVE,
/**
* This callback is iterating and {@link #process()} has been called
* but not returned yet.
*/
ITERATING,
/**
* While this callback was iterating, another request for iteration
* has been issued, so the iteration must continue even if a previous
* call to {@link #process()} returned {@link Action#IDLE}.
*/
ITERATE_AGAIN,
/**
* The overall job has succeeded.
*/
SUCCEEDED,
/**
* The overall job has failed.
*/
FAILED
}
} }