diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 91f367c07ad..9e51aa5dbbc 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -62,11 +62,10 @@ abstract public class WriteFlusher // IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE // // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure. + // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions, + // the callback's complete or respectively failed methods will be called. // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state - // Otherwise if a fail happens, the state is set to FAIL, so that a subsequent attempt to move out of WRITING or COMPLETING - // will discover the failure and call the callbacks before returning to IDLE - // Thus the possible paths for a failure are: - // + // // IDLE--(fail)-->IDLE // IDLE-->WRITING--(fail)-->FAILED-->IDLE // IDLE-->WRITING-->PENDING--(fail)-->IDLE @@ -94,7 +93,8 @@ abstract public class WriteFlusher /** * Tries to update the current state to the given new state. - * @param nextState the desired new state + * @param previous the expected current state + * @param next the desired new state * @return the previous state or null if the state transition failed * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error) */ @@ -135,10 +135,6 @@ abstract public class WriteFlusher private boolean isTransitionAllowed(State currentState, State newState) { Set allowedNewStateTypes = __stateTransitions.get(currentState.getType()); - if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING) - { - throw new WritePendingException(); - } if (!allowedNewStateTypes.contains(newState.getType())) { LOG.debug("StateType update: {} -> {} not allowed", currentState, newState); @@ -288,7 +284,7 @@ abstract public class WriteFlusher if (!updateState(__IDLE,__WRITING)) throw new WritePendingException(); - + try { _endPoint.flush(buffers); @@ -333,12 +329,11 @@ abstract public class WriteFlusher public void completeWrite() { State previous = _state.get(); - PendingState pending=null; - + if (previous.getType()!=StateType.PENDING) return; // failure already handled. - pending=(PendingState)previous; + PendingState pending = (PendingState)previous; if (!updateState(pending,__COMPLETING)) return; // failure already handled. @@ -412,7 +407,7 @@ abstract public class WriteFlusher onFail(new ClosedChannelException()); } - public boolean isIdle() + boolean isIdle() { return _state.get().getType() == StateType.IDLE; } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java index cab02a6da9b..b24022b305e 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java @@ -1,6 +1,5 @@ package org.eclipse.jetty.io; -import static junit.framework.Assert.assertTrue; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; @@ -21,7 +20,6 @@ import org.eclipse.jetty.util.FutureCallback; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -47,13 +45,14 @@ public class WriteFlusherTest private WriteFlusher _flusher; private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false); - private final String _context = new String("Context"); + private final String _context = "Context"; + private final ExecutorService executor = Executors.newFixedThreadPool(16); private ByteArrayEndPoint _endp; @Before public void before() { - _endp = new ByteArrayEndPoint(new byte[]{},10); + _endp = new ByteArrayEndPoint(new byte[]{}, 10); _flushIncomplete.set(false); _flusher = new WriteFlusher(_endp) { @@ -72,7 +71,7 @@ public class WriteFlusherTest FutureCallback callback = new FutureCallback<>(); _flusher.onFail(new IOException("Ignored because no operation in progress")); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertCallbackIsDone(callback); assertFlushIsComplete(); assertThat("context and callback.get() are equal", _context, equalTo(callback.get())); @@ -80,14 +79,14 @@ public class WriteFlusherTest equalTo(_endp.takeOutputString())); assertTrue(_flusher.isIdle()); } - + @Test public void testCompleteNoBlocking() throws Exception { _endp.setGrowOutput(true); FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertCallbackIsDone(callback); assertFlushIsComplete(); assertThat("context and callback.get() are equal", _context, equalTo(callback.get())); @@ -112,21 +111,21 @@ public class WriteFlusherTest _endp.close(); FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertCallbackIsDone(callback); assertFlushIsComplete(); try { - assertEquals(_context,callback.get()); + assertEquals(_context, callback.get()); Assert.fail(); } - catch(ExecutionException e) + catch (ExecutionException e) { Throwable cause = e.getCause(); Assert.assertTrue(cause instanceof IOException); - Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED")); + Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED")); } - assertEquals("",_endp.takeOutputString()); + assertEquals("", _endp.takeOutputString()); assertTrue(_flusher.isIdle()); } @@ -135,14 +134,14 @@ public class WriteFlusherTest public void testCompleteBlocking() throws Exception { FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); assertTrue(_flushIncomplete.get()); try { - assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); + assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS)); Assert.fail(); } catch (TimeoutException to) @@ -150,11 +149,11 @@ public class WriteFlusherTest _flushIncomplete.set(false); } - assertEquals("How now br",_endp.takeOutputString()); + assertEquals("How now br", _endp.takeOutputString()); _flusher.completeWrite(); assertCallbackIsDone(callback); - assertEquals(_context,callback.get()); - assertEquals("own cow!",_endp.takeOutputString()); + assertEquals(_context, callback.get()); + assertEquals("own cow!", _endp.takeOutputString()); assertFlushIsComplete(); assertTrue(_flusher.isIdle()); } @@ -163,7 +162,7 @@ public class WriteFlusherTest public void testCloseWhileBlocking() throws Exception { FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); @@ -171,7 +170,7 @@ public class WriteFlusherTest assertTrue(_flushIncomplete.get()); try { - assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); + assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS)); Assert.fail(); } catch (TimeoutException to) @@ -179,23 +178,23 @@ public class WriteFlusherTest _flushIncomplete.set(false); } - assertEquals("How now br",_endp.takeOutputString()); + assertEquals("How now br", _endp.takeOutputString()); _endp.close(); _flusher.completeWrite(); assertCallbackIsDone(callback); assertFlushIsComplete(); try { - assertEquals(_context,callback.get()); + assertEquals(_context, callback.get()); Assert.fail(); } - catch(ExecutionException e) + catch (ExecutionException e) { Throwable cause = e.getCause(); Assert.assertTrue(cause instanceof IOException); - Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED")); + Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED")); } - assertEquals("",_endp.takeOutputString()); + assertEquals("", _endp.takeOutputString()); assertTrue(_flusher.isIdle()); } @@ -203,7 +202,7 @@ public class WriteFlusherTest public void testFailWhileBlocking() throws Exception { FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); @@ -211,7 +210,7 @@ public class WriteFlusherTest assertTrue(_flushIncomplete.get()); try { - assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); + assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS)); Assert.fail(); } catch (TimeoutException to) @@ -226,17 +225,17 @@ public class WriteFlusherTest assertFlushIsComplete(); try { - assertEquals(_context,callback.get()); + assertEquals(_context, callback.get()); Assert.fail(); } - catch(ExecutionException e) + catch (ExecutionException e) { Throwable cause = e.getCause(); Assert.assertTrue(cause instanceof IOException); - Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure")); + Assert.assertThat(cause.getMessage(), Matchers.containsString("Failure")); } assertEquals("", _endp.takeOutputString()); - + assertTrue(_flusher.isIdle()); } @@ -245,29 +244,29 @@ public class WriteFlusherTest final ByteArrayEndPoint _endp; final SecureRandom _random; final ScheduledThreadPoolExecutor _scheduler; - final StringBuilder _content=new StringBuilder(); - - ConcurrentFlusher(ByteArrayEndPoint endp,SecureRandom random, ScheduledThreadPoolExecutor scheduler) + final StringBuilder _content = new StringBuilder(); + + ConcurrentFlusher(ByteArrayEndPoint endp, SecureRandom random, ScheduledThreadPoolExecutor scheduler) { super(endp); - _endp=endp; - _random=random; - _scheduler=scheduler; + _endp = endp; + _random = random; + _scheduler = scheduler; } - + @Override protected void onIncompleteFlushed() { - _scheduler.schedule(this,1+_random.nextInt(9),TimeUnit.MILLISECONDS); + _scheduler.schedule(this, 1 + _random.nextInt(9), TimeUnit.MILLISECONDS); } - + @Override public synchronized void run() { _content.append(_endp.takeOutputString()); completeWrite(); } - + @Override public synchronized String toString() { @@ -275,25 +274,25 @@ public class WriteFlusherTest return _content.toString(); } } - + @Test public void testConcurrent() throws Exception { final SecureRandom random = new SecureRandom(); final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100); - - + + ConcurrentFlusher[] flushers = new ConcurrentFlusher[50000]; FutureCallback[] futures = new FutureCallback[flushers.length]; - for (int i=0;i callback = new FutureCallback<>(); - futures[i]=callback; + futures[i] = callback; scheduler.schedule(new Runnable() { @Override @@ -302,39 +301,37 @@ public class WriteFlusherTest flusher.onFail(new Throwable("THE CAUSE")); } } - ,random.nextInt(75)+1,TimeUnit.MILLISECONDS); - flusher.write(_context,callback,BufferUtil.toBuffer("How Now Brown Cow."),BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!")); + , random.nextInt(75) + 1, TimeUnit.MILLISECONDS); + flusher.write(_context, callback, BufferUtil.toBuffer("How Now Brown Cow."), BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!")); } - int completed=0; - int failed=0; - - for (int i=0;i())); + // make sure that we call .get() on the write that executed second by waiting on this latch + assertThat("Flush has been called once", flushCalledLatch.await(5, TimeUnit.SECONDS), is(true)); try { executor.submit(new Writer(writeFlusher, new FutureCallback())).get(); @@ -459,16 +458,14 @@ public class WriteFlusherTest } @Test - @Ignore - public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException + public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException { - ExecutorService executor = Executors.newFixedThreadPool(16); final CountDownLatch failedCalledLatch = new CountDownLatch(1); final CountDownLatch onIncompleteFlushedCalledLatch = new CountDownLatch(1); final CountDownLatch writeCalledLatch = new CountDownLatch(1); final CountDownLatch completeWrite = new CountDownLatch(1); - final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock) + final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch)) { protected void onIncompleteFlushed() { @@ -478,8 +475,6 @@ public class WriteFlusherTest } }; - endPointFlushExpectationPendingWrite(writeCalledLatch, failedCalledLatch); - ExposingStateCallback callback = new ExposingStateCallback(); executor.submit(new Writer(writeFlusher, callback)); assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true)); @@ -487,44 +482,55 @@ public class WriteFlusherTest assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true)); writeFlusher.write(_context, new FutureCallback(), BufferUtil.toBuffer("foobar")); assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true)); + callback.get(5, TimeUnit.SECONDS); + assertThat("callback failed has not been called", callback.isFailed(), is(false)); + assertThat("callback complete has been called", callback.isCompleted(), is(true)); } - - //TODO: combine with endPointFlushExpectation - private void endPointFlushExpectationPendingWrite(final CountDownLatch writeCalledLatch, final CountDownLatch - failedCalledLatch) - throws - IOException + private static class EndPointMock extends ByteArrayEndPoint { - when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer() + private final CountDownLatch writeCalledLatch; + private final CountDownLatch failedCalledLatch; + + public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch) { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable + this.writeCalledLatch = writeCalledLatch; + this.failedCalledLatch = failedCalledLatch; + } + + @Override + public int flush(ByteBuffer... buffers) throws IOException + { + writeCalledLatch.countDown(); + ByteBuffer byteBuffer = buffers[0]; + int oldPos = byteBuffer.position(); + if (byteBuffer.remaining() == 2) { - writeCalledLatch.countDown(); - Object[] arguments = invocation.getArguments(); - ByteBuffer byteBuffer = (ByteBuffer)arguments[0]; - int oldPos = byteBuffer.position(); - if (byteBuffer.remaining() == 2) + // make sure failed is called before we go on + try { - // make sure failed is called before we go on failedCalledLatch.await(5, TimeUnit.SECONDS); - BufferUtil.flipToFill(byteBuffer); } - else if (byteBuffer.remaining() == 3) + catch (InterruptedException e) { - byteBuffer.position(1); // pretend writing one byte - return 1; + e.printStackTrace(); } - else - { - byteBuffer.position(byteBuffer.limit()); - } - return byteBuffer.limit() - oldPos; + BufferUtil.flipToFill(byteBuffer); } - }); + else if (byteBuffer.remaining() == 3) + { + byteBuffer.position(1); // pretend writing one byte + return 1; + } + else + { + byteBuffer.position(byteBuffer.limit()); + } + return byteBuffer.limit() - oldPos; + } } + private static class FailedCaller implements Callable { private final WriteFlusher writeFlusher; @@ -550,7 +556,7 @@ public class WriteFlusherTest private final WriteFlusher writeFlusher; private FutureCallback callback; - public Writer(WriteFlusher writeFlusher, FutureCallback callback) + public Writer(WriteFlusher writeFlusher, FutureCallback callback) { this.writeFlusher = writeFlusher; this.callback = callback;