Issue #2592 - Rework of WriteFlusher

WriteFlusher.write() now requires the callback it is given is not null

the FAILED state of WriteFlusher is now a terminal state
any failure will now result in the callback being failed and a transition to the FAILED state

the WriteFlusher documentation and WriteFlusherTests have also been altered to reflect these changes

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2018-06-07 22:02:44 +10:00
parent b98dcb4155
commit f2893fdd0b
2 changed files with 58 additions and 98 deletions

View File

@ -25,6 +25,7 @@ import java.nio.channels.WritePendingException;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -61,7 +62,7 @@ abstract public class WriteFlusher
__stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
__stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE, StateType.FAILED)); __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE, StateType.FAILED));
__stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
__stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE)); __stateTransitions.put(StateType.FAILED, EnumSet.noneOf(StateType.class));
} }
// A write operation may either complete immediately: // A write operation may either complete immediately:
@ -72,19 +73,11 @@ abstract public class WriteFlusher
// IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE // IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
// //
// If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure. // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
// If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions, // IDLE--(fail)-->IDLE
// the callback's complete or respectively failed methods will be called.
// If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
//
// IDLE--(fail)-->IDLE
// IDLE-->WRITING--(fail)-->FAILED-->IDLE
// IDLE-->WRITING-->PENDING--(fail)-->IDLE
// IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
//
// So a call to fail in the PENDING state will be directly handled and the state changed to IDLE
// A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be
// handled with the write or completeWrite methods try to move the state from what they thought it was.
// //
// From any other state than IDLE a failure will result in an FAILED state which is a terminal state, and
// the callback is failed with the Throwable which caused the failure.
// IDLE-->WRITING--(fail)-->FAILED
protected WriteFlusher(EndPoint endPoint) protected WriteFlusher(EndPoint endPoint)
{ {
@ -120,32 +113,6 @@ abstract public class WriteFlusher
return updated; return updated;
} }
private void fail(PendingState pending)
{
State current = _state.get();
if (current.getType() == StateType.FAILED)
{
FailedState failed = (FailedState)current;
if (updateState(failed, __IDLE))
{
pending.fail(failed.getCause());
return;
}
}
throw new IllegalStateException();
}
private void ignoreFail()
{
State current = _state.get();
while (current.getType() == StateType.FAILED)
{
if (updateState(current, __IDLE))
return;
current = _state.get();
}
}
private boolean isTransitionAllowed(State currentState, State newState) private boolean isTransitionAllowed(State currentState, State newState)
{ {
Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType()); Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
@ -256,31 +223,10 @@ abstract public class WriteFlusher
return _buffers; return _buffers;
} }
protected boolean fail(Throwable cause)
{
if (_callback != null)
{
_callback.failed(cause);
return true;
}
return false;
}
protected void complete()
{
if (_callback != null)
_callback.succeeded();
}
InvocationType getCallbackInvocationType() InvocationType getCallbackInvocationType()
{ {
return Invocable.getInvocationType(_callback); return Invocable.getInvocationType(_callback);
} }
public Object getCallback()
{
return _callback;
}
} }
public InvocationType getCallbackInvocationType() public InvocationType getCallbackInvocationType()
@ -299,7 +245,7 @@ abstract public class WriteFlusher
/** /**
* Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
* fails it'll fail the callback. * fails it will fail the callback and leave the WriteFlusher in a terminal FAILED state.
* *
* If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state * If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state
* and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}. * and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}.
@ -312,6 +258,8 @@ abstract public class WriteFlusher
*/ */
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{ {
callback = Objects.requireNonNull(callback);
if (DEBUG) if (DEBUG)
LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers)); LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
@ -322,7 +270,6 @@ abstract public class WriteFlusher
{ {
buffers = flush(buffers); buffers = flush(buffers);
// if we are incomplete?
if (buffers != null) if (buffers != null)
{ {
if (DEBUG) if (DEBUG)
@ -331,30 +278,38 @@ abstract public class WriteFlusher
if (updateState(__WRITING, pending)) if (updateState(__WRITING, pending))
onIncompleteFlush(); onIncompleteFlush();
else else
fail(pending); fail(callback);
return; return;
} }
// If updateState didn't succeed, we don't care as our buffers have been written if (updateState(__WRITING, __IDLE))
if (!updateState(__WRITING, __IDLE))
ignoreFail();
if (callback != null)
callback.succeeded(); callback.succeeded();
else
fail(callback);
} }
catch (IOException e) catch (IOException e)
{ {
if (DEBUG) if (DEBUG)
LOG.debug("write exception", e); LOG.debug("write exception", e);
if (updateState(__WRITING, __IDLE)) if (updateState(__WRITING, new FailedState(e)))
{ callback.failed(e);
if (callback != null)
callback.failed(e);
}
else else
fail(new PendingState(buffers, callback)); fail(callback, e);
} }
} }
private void fail(Callback callback, Throwable... suppressed)
{
FailedState failed = (FailedState)_state.get();
Throwable cause = failed.getCause();
for(Throwable t : suppressed)
cause.addSuppressed(t);
callback.failed(cause);
}
/** /**
* Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this * Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress. * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
@ -377,39 +332,39 @@ abstract public class WriteFlusher
if (!updateState(pending, __COMPLETING)) if (!updateState(pending, __COMPLETING))
return; // failure already handled. return; // failure already handled.
Callback callback = pending._callback;
try try
{ {
ByteBuffer[] buffers = pending.getBuffers(); ByteBuffer[] buffers = pending.getBuffers();
buffers = flush(buffers); buffers = flush(buffers);
// if we are incomplete?
if (buffers != null) if (buffers != null)
{ {
if (DEBUG) if (DEBUG)
LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers)); LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers));
if (buffers != pending.getBuffers()) if (buffers != pending.getBuffers())
pending = new PendingState(buffers, pending._callback); pending = new PendingState(buffers, callback);
if (updateState(__COMPLETING, pending)) if (updateState(__COMPLETING, pending))
onIncompleteFlush(); onIncompleteFlush();
else else
fail(pending); fail(callback);
return; return;
} }
// If updateState didn't succeed, we don't care as our buffers have been written if (updateState(__COMPLETING, __IDLE))
if (!updateState(__COMPLETING, __IDLE)) callback.succeeded();
ignoreFail(); else
pending.complete(); fail(callback);
} }
catch (IOException e) catch (IOException e)
{ {
if (DEBUG) if (DEBUG)
LOG.debug("completeWrite exception", e); LOG.debug("completeWrite exception", e);
if (updateState(__COMPLETING, __IDLE)) if (updateState(__COMPLETING, new FailedState(e)))
pending.fail(e); callback.failed(e);
else else
fail(pending); fail(callback, e);
} }
} }
@ -513,16 +468,22 @@ abstract public class WriteFlusher
PendingState pending = (PendingState)current; PendingState pending = (PendingState)current;
if (updateState(pending, new FailedState(cause))) if (updateState(pending, new FailedState(cause)))
return pending.fail(cause); {
pending._callback.failed(cause);
return true;
}
break;
case WRITING:
case COMPLETING:
if (DEBUG)
LOG.debug("failed: " + this, cause);
if (updateState(current, new FailedState(cause)))
return true;
break; break;
default: default:
if (DEBUG) throw new IllegalStateException();
LOG.debug("failed: " + this, cause);
if (updateState(current, new FailedState(cause)))
return false;
break;
} }
} }
} }
@ -565,7 +526,7 @@ abstract public class WriteFlusher
public String toString() public String toString()
{ {
State s = _state.get(); State s = _state.get();
return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s).getCallback() : null); return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s)._callback : null);
} }
/** /**

View File

@ -114,7 +114,7 @@ public class WriteFlusherTest
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED")); Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
} }
Assert.assertEquals("", endPoint.takeOutputString()); Assert.assertEquals("", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle()); Assert.assertTrue(flusher.isFailed());
} }
@Test @Test
@ -204,7 +204,7 @@ public class WriteFlusherTest
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED")); Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
} }
Assert.assertEquals("", endPoint.takeOutputString()); Assert.assertEquals("", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle()); Assert.assertTrue(flusher.isFailed());
} }
@Test @Test
@ -380,7 +380,7 @@ public class WriteFlusherTest
flusher.write(Callback.NOOP, BufferUtil.toBuffer("bar")); flusher.write(Callback.NOOP, BufferUtil.toBuffer("bar"));
} }
@Test @Test(expected = ExecutionException.class)
public void testConcurrentWriteAndOnFail() throws Exception public void testConcurrentWriteAndOnFail() throws Exception
{ {
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16); ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
@ -392,7 +392,7 @@ public class WriteFlusherTest
{ {
ByteBuffer[] result = super.flush(buffers); ByteBuffer[] result = super.flush(buffers);
boolean notified = onFail(new Throwable()); boolean notified = onFail(new Throwable());
Assert.assertFalse(notified); Assert.assertTrue(notified);
return result; return result;
} }
@ -405,10 +405,9 @@ public class WriteFlusherTest
FutureCallback callback = new FutureCallback(); FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("foo")); flusher.write(callback, BufferUtil.toBuffer("foo"));
// Callback must be successfully completed. Assert.assertTrue(flusher.isFailed());
callback.get(1, TimeUnit.SECONDS); callback.get(1, TimeUnit.SECONDS);
// Flusher must be idle - not failed - since the write succeeded.
Assert.assertTrue(flusher.isIdle());
} }
@Test @Test