Merge pull request #2636 from lachlan-roberts/jetty-9.4.x-2592-WriteFlusher-Cleanup
Issue #2592 - WriteFlusher Cleanup
This commit is contained in:
commit
482e207392
|
@ -25,6 +25,7 @@ import java.nio.channels.WritePendingException;
|
|||
import java.util.Arrays;
|
||||
import java.util.EnumMap;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -61,7 +62,7 @@ abstract public class WriteFlusher
|
|||
__stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
|
||||
__stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE, StateType.FAILED));
|
||||
__stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
|
||||
__stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
|
||||
__stateTransitions.put(StateType.FAILED, EnumSet.noneOf(StateType.class));
|
||||
}
|
||||
|
||||
// A write operation may either complete immediately:
|
||||
|
@ -72,19 +73,11 @@ abstract public class WriteFlusher
|
|||
// IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
|
||||
//
|
||||
// If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
|
||||
// If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
|
||||
// the callback's complete or respectively failed methods will be called.
|
||||
// If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
|
||||
//
|
||||
// IDLE--(fail)-->IDLE
|
||||
// IDLE-->WRITING--(fail)-->FAILED-->IDLE
|
||||
// IDLE-->WRITING-->PENDING--(fail)-->IDLE
|
||||
// IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
|
||||
//
|
||||
// So a call to fail in the PENDING state will be directly handled and the state changed to IDLE
|
||||
// A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be
|
||||
// handled with the write or completeWrite methods try to move the state from what they thought it was.
|
||||
// IDLE--(fail)-->IDLE
|
||||
//
|
||||
// From any other state than IDLE a failure will result in an FAILED state which is a terminal state, and
|
||||
// the callback is failed with the Throwable which caused the failure.
|
||||
// IDLE-->WRITING--(fail)-->FAILED
|
||||
|
||||
protected WriteFlusher(EndPoint endPoint)
|
||||
{
|
||||
|
@ -120,32 +113,6 @@ abstract public class WriteFlusher
|
|||
return updated;
|
||||
}
|
||||
|
||||
private void fail(PendingState pending)
|
||||
{
|
||||
State current = _state.get();
|
||||
if (current.getType() == StateType.FAILED)
|
||||
{
|
||||
FailedState failed = (FailedState)current;
|
||||
if (updateState(failed, __IDLE))
|
||||
{
|
||||
pending.fail(failed.getCause());
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
private void ignoreFail()
|
||||
{
|
||||
State current = _state.get();
|
||||
while (current.getType() == StateType.FAILED)
|
||||
{
|
||||
if (updateState(current, __IDLE))
|
||||
return;
|
||||
current = _state.get();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isTransitionAllowed(State currentState, State newState)
|
||||
{
|
||||
Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
|
||||
|
@ -256,31 +223,10 @@ abstract public class WriteFlusher
|
|||
return _buffers;
|
||||
}
|
||||
|
||||
protected boolean fail(Throwable cause)
|
||||
{
|
||||
if (_callback != null)
|
||||
{
|
||||
_callback.failed(cause);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void complete()
|
||||
{
|
||||
if (_callback != null)
|
||||
_callback.succeeded();
|
||||
}
|
||||
|
||||
InvocationType getCallbackInvocationType()
|
||||
{
|
||||
return Invocable.getInvocationType(_callback);
|
||||
}
|
||||
|
||||
public Object getCallback()
|
||||
{
|
||||
return _callback;
|
||||
}
|
||||
}
|
||||
|
||||
public InvocationType getCallbackInvocationType()
|
||||
|
@ -299,7 +245,7 @@ abstract public class WriteFlusher
|
|||
|
||||
/**
|
||||
* Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
|
||||
* fails it'll fail the callback.
|
||||
* fails it will fail the callback and leave the WriteFlusher in a terminal FAILED state.
|
||||
*
|
||||
* If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state
|
||||
* and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}.
|
||||
|
@ -312,6 +258,14 @@ abstract public class WriteFlusher
|
|||
*/
|
||||
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
|
||||
{
|
||||
callback = Objects.requireNonNull(callback);
|
||||
|
||||
if(isFailed())
|
||||
{
|
||||
fail(callback);
|
||||
return;
|
||||
}
|
||||
|
||||
if (DEBUG)
|
||||
LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
|
||||
|
||||
|
@ -322,7 +276,6 @@ abstract public class WriteFlusher
|
|||
{
|
||||
buffers = flush(buffers);
|
||||
|
||||
// if we are incomplete?
|
||||
if (buffers != null)
|
||||
{
|
||||
if (DEBUG)
|
||||
|
@ -331,30 +284,38 @@ abstract public class WriteFlusher
|
|||
if (updateState(__WRITING, pending))
|
||||
onIncompleteFlush();
|
||||
else
|
||||
fail(pending);
|
||||
fail(callback);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// If updateState didn't succeed, we don't care as our buffers have been written
|
||||
if (!updateState(__WRITING, __IDLE))
|
||||
ignoreFail();
|
||||
if (callback != null)
|
||||
if (updateState(__WRITING, __IDLE))
|
||||
callback.succeeded();
|
||||
else
|
||||
fail(callback);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
if (DEBUG)
|
||||
LOG.debug("write exception", e);
|
||||
if (updateState(__WRITING, __IDLE))
|
||||
{
|
||||
if (callback != null)
|
||||
callback.failed(e);
|
||||
}
|
||||
if (updateState(__WRITING, new FailedState(e)))
|
||||
callback.failed(e);
|
||||
else
|
||||
fail(new PendingState(buffers, callback));
|
||||
fail(callback, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void fail(Callback callback, Throwable... suppressed)
|
||||
{
|
||||
FailedState failed = (FailedState)_state.get();
|
||||
|
||||
Throwable cause = failed.getCause();
|
||||
for(Throwable t : suppressed)
|
||||
cause.addSuppressed(t);
|
||||
|
||||
callback.failed(cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this
|
||||
* method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
|
||||
|
@ -377,39 +338,39 @@ abstract public class WriteFlusher
|
|||
if (!updateState(pending, __COMPLETING))
|
||||
return; // failure already handled.
|
||||
|
||||
Callback callback = pending._callback;
|
||||
try
|
||||
{
|
||||
ByteBuffer[] buffers = pending.getBuffers();
|
||||
|
||||
buffers = flush(buffers);
|
||||
|
||||
// if we are incomplete?
|
||||
if (buffers != null)
|
||||
{
|
||||
if (DEBUG)
|
||||
LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers));
|
||||
if (buffers != pending.getBuffers())
|
||||
pending = new PendingState(buffers, pending._callback);
|
||||
pending = new PendingState(buffers, callback);
|
||||
if (updateState(__COMPLETING, pending))
|
||||
onIncompleteFlush();
|
||||
else
|
||||
fail(pending);
|
||||
fail(callback);
|
||||
return;
|
||||
}
|
||||
|
||||
// If updateState didn't succeed, we don't care as our buffers have been written
|
||||
if (!updateState(__COMPLETING, __IDLE))
|
||||
ignoreFail();
|
||||
pending.complete();
|
||||
if (updateState(__COMPLETING, __IDLE))
|
||||
callback.succeeded();
|
||||
else
|
||||
fail(callback);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
if (DEBUG)
|
||||
LOG.debug("completeWrite exception", e);
|
||||
if (updateState(__COMPLETING, __IDLE))
|
||||
pending.fail(e);
|
||||
if (updateState(__COMPLETING, new FailedState(e)))
|
||||
callback.failed(e);
|
||||
else
|
||||
fail(pending);
|
||||
fail(callback, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -513,16 +474,22 @@ abstract public class WriteFlusher
|
|||
|
||||
PendingState pending = (PendingState)current;
|
||||
if (updateState(pending, new FailedState(cause)))
|
||||
return pending.fail(cause);
|
||||
{
|
||||
pending._callback.failed(cause);
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
|
||||
case WRITING:
|
||||
case COMPLETING:
|
||||
if (DEBUG)
|
||||
LOG.debug("failed: " + this, cause);
|
||||
if (updateState(current, new FailedState(cause)))
|
||||
return true;
|
||||
break;
|
||||
|
||||
default:
|
||||
if (DEBUG)
|
||||
LOG.debug("failed: " + this, cause);
|
||||
|
||||
if (updateState(current, new FailedState(cause)))
|
||||
return false;
|
||||
break;
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -565,7 +532,7 @@ abstract public class WriteFlusher
|
|||
public String toString()
|
||||
{
|
||||
State s = _state.get();
|
||||
return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s).getCallback() : null);
|
||||
return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s)._callback : null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -114,7 +114,7 @@ public class WriteFlusherTest
|
|||
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
|
||||
}
|
||||
Assert.assertEquals("", endPoint.takeOutputString());
|
||||
Assert.assertTrue(flusher.isIdle());
|
||||
Assert.assertTrue(flusher.isFailed());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -204,7 +204,7 @@ public class WriteFlusherTest
|
|||
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
|
||||
}
|
||||
Assert.assertEquals("", endPoint.takeOutputString());
|
||||
Assert.assertTrue(flusher.isIdle());
|
||||
Assert.assertTrue(flusher.isFailed());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -380,7 +380,7 @@ public class WriteFlusherTest
|
|||
flusher.write(Callback.NOOP, BufferUtil.toBuffer("bar"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(expected = ExecutionException.class)
|
||||
public void testConcurrentWriteAndOnFail() throws Exception
|
||||
{
|
||||
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
|
||||
|
@ -392,7 +392,7 @@ public class WriteFlusherTest
|
|||
{
|
||||
ByteBuffer[] result = super.flush(buffers);
|
||||
boolean notified = onFail(new Throwable());
|
||||
Assert.assertFalse(notified);
|
||||
Assert.assertTrue(notified);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -405,10 +405,9 @@ public class WriteFlusherTest
|
|||
FutureCallback callback = new FutureCallback();
|
||||
flusher.write(callback, BufferUtil.toBuffer("foo"));
|
||||
|
||||
// Callback must be successfully completed.
|
||||
Assert.assertTrue(flusher.isFailed());
|
||||
|
||||
callback.get(1, TimeUnit.SECONDS);
|
||||
// Flusher must be idle - not failed - since the write succeeded.
|
||||
Assert.assertTrue(flusher.isIdle());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue