From 9b764ef3fa66d74d7fe6af0d09cd9f8e5001552b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 26 Jun 2014 23:50:07 +0200 Subject: [PATCH] 435322 Fixed Iterating Callback close --- .../eclipse/jetty/io/ssl/SslConnection.java | 1 - .../eclipse/jetty/server/HttpConnection.java | 10 +- .../eclipse/jetty/util/IteratingCallback.java | 183 ++++++++++-------- 3 files changed, 104 insertions(+), 90 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index ee9e449b151..4559c22b85f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -820,7 +820,6 @@ public class SslConnection extends AbstractConnection } catch (Exception e) { - getEndPoint().close(); throw e; } finally diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 4d41cce3ab8..02fa663a1fb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -426,11 +426,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public void onClose() { - if (_sendCallback.isInUse()) - { - LOG.warn("Closed with pending write:"+this); - _sendCallback.failed(new EofException("Connection closed")); - } + _sendCallback.close(); super.onClose(); } @@ -580,6 +576,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _header = null; _shutdownOut = false; } + else if (isClosed()) + { + callback.failed(new EofException()); + } else { callback.failed(new WritePendingException()); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 1b0837bc4f7..71d41c735d0 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.util; +import java.io.EOFException; import java.util.concurrent.atomic.AtomicReference; /** @@ -50,6 +51,46 @@ import java.util.concurrent.atomic.AtomicReference; */ public abstract class IteratingCallback implements Callback { + + /** + * The internal states of this callback + */ + private enum State + { + /** + * This callback is inactive, ready to iterate. + */ + INACTIVE, + /** + * This callback is iterating and {@link #process()} has scheduled an + * asynchronous operation by returning {@link Action#SCHEDULED}, but + * the operation is still undergoing. + */ + ACTIVE, + /** + * This callback is iterating and {@link #process()} has been called + * but not returned yet. + */ + ITERATING, + /** + * While this callback was iterating, another request for iteration + * has been issued, so the iteration must continue even if a previous + * call to {@link #process()} returned {@link Action#IDLE}. + */ + ITERATE_AGAIN, + /** + * The overall job has succeeded. + */ + SUCCEEDED, + /** + * The overall job has failed. + */ + FAILED, + /** + * The ICB has been closed and cannot be reset + */ + CLOSED + } /** * The indication of the overall progress of the overall job that * implementations of {@link #process()} must return. @@ -211,8 +252,25 @@ public abstract class IteratingCallback implements Callback case SUCCEEDED: { // The overall job has completed. - completeSuccess(); - return true; + while (true) + { + State current = _state.get(); + switch(current) + { + case SUCCEEDED: + case FAILED: + // Already complete!. + return true; + case CLOSED: + throw new IllegalStateException(); + default: + if (_state.compareAndSet(current, State.SUCCEEDED)) + { + onCompleteSuccess(); + return true; + } + } + } } default: { @@ -260,6 +318,10 @@ public abstract class IteratingCallback implements Callback iterate(); return; } + case CLOSED: + // too late! + return; + default: { throw new IllegalStateException(toString()); @@ -275,32 +337,6 @@ public abstract class IteratingCallback implements Callback */ @Override public final void failed(Throwable x) - { - completeFailure(x); - } - - protected void completeSuccess() - { - while (true) - { - State current = _state.get(); - switch(current) - { - case SUCCEEDED: - case FAILED: - // Already complete!. - return; - default: - if (_state.compareAndSet(current, State.SUCCEEDED)) - { - onCompleteSuccess(); - return; - } - } - } - } - - protected void completeFailure(Throwable x) { while (true) { @@ -309,8 +345,11 @@ public abstract class IteratingCallback implements Callback { case SUCCEEDED: case FAILED: + case INACTIVE: + case CLOSED: // Already complete!. return; + default: if (_state.compareAndSet(current, State.FAILED)) { @@ -321,31 +360,43 @@ public abstract class IteratingCallback implements Callback } } - /** - * @return whether this callback is idle and {@link #iterate()} needs to be called - */ - public boolean isIdle() + public final void close() { - return _state.get() == State.INACTIVE; - } - - /* ------------------------------------------------------------ */ - /** - * @return true if the callback is not INACTIVE, FAILED or SUCCEEDED. - */ - public boolean isInUse() - { - switch(_state.get()) + while (true) { - case INACTIVE: - case FAILED: - case SUCCEEDED: - return false; - default: - return true; + State current = _state.get(); + switch(current) + { + case INACTIVE: + case SUCCEEDED: + case FAILED: + if (_state.compareAndSet(current, State.CLOSED)) + return; + break; + default: + if (_state.compareAndSet(current, State.CLOSED)) + { + onCompleteFailure(new IllegalStateException("Closed with pending callback "+current)); + return; + } + } } } + /* + * only for testing + * @return whether this callback is idle and {@link #iterate()} needs to be called + */ + boolean isIdle() + { + return _state.get() == State.INACTIVE; + } + + public boolean isClosed() + { + return _state.get() == State.CLOSED; + } + /** * @return whether this callback has failed */ @@ -397,40 +448,4 @@ public abstract class IteratingCallback implements Callback { return String.format("%s[%s]", super.toString(), _state); } - - /** - * The internal states of this callback - */ - private enum State - { - /** - * This callback is inactive, ready to iterate. - */ - INACTIVE, - /** - * This callback is iterating and {@link #process()} has scheduled an - * asynchronous operation by returning {@link Action#SCHEDULED}, but - * the operation is still undergoing. - */ - ACTIVE, - /** - * This callback is iterating and {@link #process()} has been called - * but not returned yet. - */ - ITERATING, - /** - * While this callback was iterating, another request for iteration - * has been issued, so the iteration must continue even if a previous - * call to {@link #process()} returned {@link Action#IDLE}. - */ - ITERATE_AGAIN, - /** - * The overall job has succeeded. - */ - SUCCEEDED, - /** - * The overall job has failed. - */ - FAILED - } }