From f2893fdd0b1c62f7ff5010cd97a66857cd168c88 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 7 Jun 2018 22:02:44 +1000 Subject: [PATCH] Issue #2592 - Rework of WriteFlusher WriteFlusher.write() now requires the callback it is given is not null the FAILED state of WriteFlusher is now a terminal state any failure will now result in the callback being failed and a transition to the FAILED state the WriteFlusher documentation and WriteFlusherTests have also been altered to reflect these changes Signed-off-by: Lachlan Roberts --- .../org/eclipse/jetty/io/WriteFlusher.java | 143 +++++++----------- .../eclipse/jetty/io/WriteFlusherTest.java | 13 +- 2 files changed, 58 insertions(+), 98 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index b3e8c358386..0f447f2713d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -25,6 +25,7 @@ import java.nio.channels.WritePendingException; import java.util.Arrays; import java.util.EnumMap; import java.util.EnumSet; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -61,7 +62,7 @@ abstract public class WriteFlusher __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE, StateType.FAILED)); __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); - __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE)); + __stateTransitions.put(StateType.FAILED, EnumSet.noneOf(StateType.class)); } // A write operation may either complete immediately: @@ -72,19 +73,11 @@ abstract public class WriteFlusher // IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE // // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure. - // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions, - // the callback's complete or respectively failed methods will be called. - // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state - // - // IDLE--(fail)-->IDLE - // IDLE-->WRITING--(fail)-->FAILED-->IDLE - // IDLE-->WRITING-->PENDING--(fail)-->IDLE - // IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE - // - // So a call to fail in the PENDING state will be directly handled and the state changed to IDLE - // A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be - // handled with the write or completeWrite methods try to move the state from what they thought it was. + // IDLE--(fail)-->IDLE // + // From any other state than IDLE a failure will result in an FAILED state which is a terminal state, and + // the callback is failed with the Throwable which caused the failure. + // IDLE-->WRITING--(fail)-->FAILED protected WriteFlusher(EndPoint endPoint) { @@ -120,32 +113,6 @@ abstract public class WriteFlusher return updated; } - private void fail(PendingState pending) - { - State current = _state.get(); - if (current.getType() == StateType.FAILED) - { - FailedState failed = (FailedState)current; - if (updateState(failed, __IDLE)) - { - pending.fail(failed.getCause()); - return; - } - } - throw new IllegalStateException(); - } - - private void ignoreFail() - { - State current = _state.get(); - while (current.getType() == StateType.FAILED) - { - if (updateState(current, __IDLE)) - return; - current = _state.get(); - } - } - private boolean isTransitionAllowed(State currentState, State newState) { Set allowedNewStateTypes = __stateTransitions.get(currentState.getType()); @@ -256,31 +223,10 @@ abstract public class WriteFlusher return _buffers; } - protected boolean fail(Throwable cause) - { - if (_callback != null) - { - _callback.failed(cause); - return true; - } - return false; - } - - protected void complete() - { - if (_callback != null) - _callback.succeeded(); - } - InvocationType getCallbackInvocationType() { return Invocable.getInvocationType(_callback); } - - public Object getCallback() - { - return _callback; - } } public InvocationType getCallbackInvocationType() @@ -299,7 +245,7 @@ abstract public class WriteFlusher /** * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition - * fails it'll fail the callback. + * fails it will fail the callback and leave the WriteFlusher in a terminal FAILED state. * * If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state * and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}. @@ -312,6 +258,8 @@ abstract public class WriteFlusher */ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException { + callback = Objects.requireNonNull(callback); + if (DEBUG) LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers)); @@ -322,7 +270,6 @@ abstract public class WriteFlusher { buffers = flush(buffers); - // if we are incomplete? if (buffers != null) { if (DEBUG) @@ -331,30 +278,38 @@ abstract public class WriteFlusher if (updateState(__WRITING, pending)) onIncompleteFlush(); else - fail(pending); + fail(callback); + return; } - // If updateState didn't succeed, we don't care as our buffers have been written - if (!updateState(__WRITING, __IDLE)) - ignoreFail(); - if (callback != null) + if (updateState(__WRITING, __IDLE)) callback.succeeded(); + else + fail(callback); } catch (IOException e) { if (DEBUG) LOG.debug("write exception", e); - if (updateState(__WRITING, __IDLE)) - { - if (callback != null) - callback.failed(e); - } + if (updateState(__WRITING, new FailedState(e))) + callback.failed(e); else - fail(new PendingState(buffers, callback)); + fail(callback, e); } } + private void fail(Callback callback, Throwable... suppressed) + { + FailedState failed = (FailedState)_state.get(); + + Throwable cause = failed.getCause(); + for(Throwable t : suppressed) + cause.addSuppressed(t); + + callback.failed(cause); + } + /** * Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress. @@ -377,39 +332,39 @@ abstract public class WriteFlusher if (!updateState(pending, __COMPLETING)) return; // failure already handled. + Callback callback = pending._callback; try { ByteBuffer[] buffers = pending.getBuffers(); buffers = flush(buffers); - // if we are incomplete? if (buffers != null) { if (DEBUG) LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers)); if (buffers != pending.getBuffers()) - pending = new PendingState(buffers, pending._callback); + pending = new PendingState(buffers, callback); if (updateState(__COMPLETING, pending)) onIncompleteFlush(); else - fail(pending); + fail(callback); return; } - // If updateState didn't succeed, we don't care as our buffers have been written - if (!updateState(__COMPLETING, __IDLE)) - ignoreFail(); - pending.complete(); + if (updateState(__COMPLETING, __IDLE)) + callback.succeeded(); + else + fail(callback); } catch (IOException e) { if (DEBUG) LOG.debug("completeWrite exception", e); - if (updateState(__COMPLETING, __IDLE)) - pending.fail(e); + if (updateState(__COMPLETING, new FailedState(e))) + callback.failed(e); else - fail(pending); + fail(callback, e); } } @@ -513,16 +468,22 @@ abstract public class WriteFlusher PendingState pending = (PendingState)current; if (updateState(pending, new FailedState(cause))) - return pending.fail(cause); + { + pending._callback.failed(cause); + return true; + } + break; + + case WRITING: + case COMPLETING: + if (DEBUG) + LOG.debug("failed: " + this, cause); + if (updateState(current, new FailedState(cause))) + return true; break; default: - if (DEBUG) - LOG.debug("failed: " + this, cause); - - if (updateState(current, new FailedState(cause))) - return false; - break; + throw new IllegalStateException(); } } } @@ -565,7 +526,7 @@ abstract public class WriteFlusher public String toString() { State s = _state.get(); - return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s).getCallback() : null); + return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s)._callback : null); } /** diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java index 61280f224e6..c775dbd7ce3 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java @@ -114,7 +114,7 @@ public class WriteFlusherTest Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED")); } Assert.assertEquals("", endPoint.takeOutputString()); - Assert.assertTrue(flusher.isIdle()); + Assert.assertTrue(flusher.isFailed()); } @Test @@ -204,7 +204,7 @@ public class WriteFlusherTest Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED")); } Assert.assertEquals("", endPoint.takeOutputString()); - Assert.assertTrue(flusher.isIdle()); + Assert.assertTrue(flusher.isFailed()); } @Test @@ -380,7 +380,7 @@ public class WriteFlusherTest flusher.write(Callback.NOOP, BufferUtil.toBuffer("bar")); } - @Test + @Test(expected = ExecutionException.class) public void testConcurrentWriteAndOnFail() throws Exception { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16); @@ -392,7 +392,7 @@ public class WriteFlusherTest { ByteBuffer[] result = super.flush(buffers); boolean notified = onFail(new Throwable()); - Assert.assertFalse(notified); + Assert.assertTrue(notified); return result; } @@ -405,10 +405,9 @@ public class WriteFlusherTest FutureCallback callback = new FutureCallback(); flusher.write(callback, BufferUtil.toBuffer("foo")); - // Callback must be successfully completed. + Assert.assertTrue(flusher.isFailed()); + callback.get(1, TimeUnit.SECONDS); - // Flusher must be idle - not failed - since the write succeeded. - Assert.assertTrue(flusher.isIdle()); } @Test