From 4e9460161944b5ccfa4bf1b5494d96ef395a0e76 Mon Sep 17 00:00:00 2001 From: Thomas Becker Date: Thu, 2 Aug 2012 14:38:54 +0200 Subject: [PATCH] jetty-9: Make WriteFlusher threadsafe. --- .../jetty/io/AsyncByteArrayEndPoint.java | 2 +- .../jetty/io/SelectChannelEndPoint.java | 10 +- .../org/eclipse/jetty/io/WriteFlusher.java | 366 +++++++++++------- .../eclipse/jetty/io/WriteFlusherTest.java | 18 - .../eclipse/jetty/io/ssl/SslConnection.java | 12 +- .../eclipse/jetty/io/WriteFlusherTest.java | 246 ++++++++++-- .../test/resources/jetty-logging.properties | 3 + .../test/resources/jetty-logging.properties | 5 +- .../eclipse/jetty/util/FutureCallback.java | 1 + 9 files changed, 475 insertions(+), 188 deletions(-) delete mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusherTest.java create mode 100644 jetty-io/src/test/resources/jetty-logging.properties diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index 52d4b924263..b1fcd447b06 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -85,7 +85,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn long idleElapsed = System.currentTimeMillis() - idleTimestamp; long idleLeft = idleTimeout - idleElapsed; - if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) + if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWritePending()) { if (idleTimestamp != 0 && idleTimeout > 0) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index f3731633d95..683b950a199 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -170,7 +170,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, LOG.debug("{} idle timeout check, elapsed: {} ms, remaining: {} ms", this, idleElapsed, idleLeft); - if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) + if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWritePending()) { if (idleTimestamp != 0 && idleTimeout > 0) { @@ -178,13 +178,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, { LOG.debug("{} idle timeout expired", this); - if (isOutputShutdown()) - close(); - notIdle(); - TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms"); _readInterest.failed(timeout); _writeFlusher.failed(timeout); + + if (isOutputShutdown()) + close(); + notIdle(); } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 868846ce649..72d9b7e630c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -1,98 +1,239 @@ +// ======================================================================== +// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + package org.eclipse.jetty.io; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.WritePendingException; -import java.util.ConcurrentModificationException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; -/* ------------------------------------------------------------ */ /** * A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)} * by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written. * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been * written after a call to flush and should organise for the {@link #completeWrite()} * method to be called when a subsequent call to flush should be able to make more progress. - * - * TODO remove synchronisation */ abstract public class WriteFlusher { - private final static ByteBuffer[] NO_BUFFERS= new ByteBuffer[0]; - private final AtomicBoolean _writing = new AtomicBoolean(false); + private static final Logger logger = Log.getLogger(WriteFlusher.class); + private final static ByteBuffer[] NO_BUFFERS = new ByteBuffer[0]; private final EndPoint _endp; - - private ByteBuffer[] _buffers; - private Object _context; - private Callback _callback; + private final AtomicReference _state = new AtomicReference<>(); + private final EnumMap> __stateTransitions = new EnumMap<>(StateType.class); //TODO: static + private final State idleState = new IdleState(); //TODO: static all of them + private final State writingState = new WritingState(); + private final State failedState = new FailedState(); + private final State completingState = new CompletedState(); + private volatile Throwable failure; protected WriteFlusher(EndPoint endp) { - _endp=endp; + _state.set(idleState); + _endp = endp; + + // fill the state machine + __stateTransitions.put(StateType.IDLE, new HashSet()); + __stateTransitions.put(StateType.WRITING, new HashSet()); + __stateTransitions.put(StateType.PENDING, new HashSet()); + __stateTransitions.put(StateType.COMPLETING, new HashSet()); + __stateTransitions.put(StateType.FAILED, new HashSet()); + + __stateTransitions.get(StateType.IDLE).add(StateType.WRITING); + __stateTransitions.get(StateType.WRITING).add(StateType.IDLE); + __stateTransitions.get(StateType.WRITING).add(StateType.PENDING); + __stateTransitions.get(StateType.WRITING).add(StateType.FAILED); + __stateTransitions.get(StateType.PENDING).add(StateType.IDLE); + __stateTransitions.get(StateType.PENDING).add(StateType.COMPLETING); + __stateTransitions.get(StateType.PENDING).add(StateType.FAILED); + __stateTransitions.get(StateType.COMPLETING).add(StateType.IDLE); + __stateTransitions.get(StateType.COMPLETING).add(StateType.PENDING); + __stateTransitions.get(StateType.COMPLETING).add(StateType.FAILED); + + __stateTransitions.get(StateType.IDLE).add(StateType.IDLE); // TODO: should never happen?! Probably remove this transition and just throw as this indicates a bug } - private enum State + private enum StateType { IDLE, WRITING, - CLOSED + PENDING, + COMPLETING, + FAILED } - private abstract class WriteFlusherState + private State updateState(State newState) { - private State _state; - private ByteBuffer[] _buffers; - private Object _context; - private Callback _callback; + State currentState = _state.get(); + boolean updated = false; - private WriteFlusherState(State state, ByteBuffer[] buffers, Object context, Callback callback) + while (!updated) { - _state = state; + if(!isTransitionAllowed(newState, currentState)) + return null; // return false + currentState + + updated = _state.compareAndSet(currentState, newState); + logger.debug("StateType update: {} -> {} {}", currentState, newState, updated ? "" : "failed"); + if (!updated) + currentState = _state.get(); + } + // We need to return true and currentState + return currentState; + } + + private boolean isTransitionAllowed(State newState, State currentState) + { + Set allowedNewStateTypes = __stateTransitions.get(currentState.getType()); + if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING) + { + logger.debug("WRITE PENDING EXCEPTION"); //TODO: thomas remove, we don't log and throw + throw new WritePendingException(); + } + if (!allowedNewStateTypes.contains(newState.getType())) + { + logger.debug("{} -> {} not allowed.", currentState.getType(), newState.getType()); //thomas remove + return false; + } + return true; + } + + private abstract class State + { + protected StateType _type; + protected ByteBuffer[] _buffers; + protected Object _context; + protected Callback _callback; + + private State(StateType stateType, ByteBuffer[] buffers, Object context, Callback callback) + { + _type = stateType; _buffers = buffers; _context = context; _callback = callback; } - } - private class WriteFlusherIdleState extends WriteFlusherState - { - private WriteFlusherIdleState() + /** + * In most States this is a noop. In others it needs to be overwritten. + * + * @param cause + */ + protected void fail(Throwable cause) { - super(null,null,null,null); + } + + /** + * In most States this is a noop. In others it needs to be overwritten. + */ + protected void complete() + { + } + + public StateType getType() + { + return _type; + } + + public void compactBuffers() + { + this._buffers = compact(_buffers); + } + + public ByteBuffer[] getBuffers() + { + return _buffers; + } + + @Override + public String toString() + { + return String.format("%s", _type); } } - private class WriteFlusherWritingState extends WriteFlusherState + private class IdleState extends State { - private WriteFlusherWritingState(State state, ByteBuffer[] buffers, Object context, Callback callback) + private IdleState() { - super(state, buffers, context, callback); + super(StateType.IDLE, null, null, null); } } - private class WriteFlusherClosingState extends WriteFlusherState + private class WritingState extends State { - private WriteFlusherClosingState() + private WritingState() { - super(null,null,null,null); + super(StateType.WRITING, null, null, null); } } - /* ------------------------------------------------------------ */ - public synchronized void write(C context, Callback callback, ByteBuffer... buffers) + private class FailedState extends State { - if (callback==null) + private FailedState() + { + super(StateType.FAILED, null, null, null); + } + } + + private class CompletedState extends State + { + private CompletedState() + { + super(StateType.COMPLETING, null, null, null); + } + } + + private class PendingState extends State + { + private PendingState(ByteBuffer[] buffers, Object context, Callback callback) + { + super(StateType.PENDING, buffers, context, callback); + } + + @Override + protected void fail(Throwable cause) + { + _callback.failed(_context, cause); + } + + @Override + protected void complete() + { + _callback.completed(_context); + } + } + + public void write(C context, Callback callback, ByteBuffer... buffers) + { + logger.debug("write: starting write. {}", _state); //thomas + if (callback == null) throw new IllegalArgumentException(); - if (!_writing.compareAndSet(false,true)) - throw new WritePendingException(); + if(updateState(writingState) == null) + { + callback.failed(context, failure); + return; + } try { - _endp.flush(buffers); // Are we complete? @@ -100,160 +241,123 @@ abstract public class WriteFlusher { if (b.hasRemaining()) { - _buffers=buffers; - _context=context; - _callback=(Callback)callback; - _writing.set(true); // Needed as memory barrier - onIncompleteFlushed(); + if(updateState(new PendingState(buffers, context, (Callback)callback)) == null) + callback.failed(context, failure); + else + onIncompleteFlushed(); return; } } - - if (!_writing.compareAndSet(true,false)) - throw new ConcurrentModificationException(); + // If updateState didn't succeed, we don't care as our buffers have been written + updateState(idleState); callback.completed(context); } catch (IOException e) { - if (!_writing.compareAndSet(true,false)) - throw new ConcurrentModificationException(e); - callback.failed(context,e); + // If updateState didn't succeed, we don't care as writing our buffers failed + updateState(failedState); + callback.failed(context, e); } } - /* ------------------------------------------------------------ */ /** - * Abstract call to be implemented by specific WriteFlushers. + * Abstract call to be implemented by specific WriteFlushers. * It should schedule a call to {@link #completeWrite()} or * {@link #failed(Throwable)} when appropriate. + * * @return true if a flush can proceed. */ abstract protected void onIncompleteFlushed(); - /* ------------------------------------------------------------ */ /* Remove empty buffers from the start of a multi buffer array */ - private synchronized ByteBuffer[] compact(ByteBuffer[] buffers) + private ByteBuffer[] compact(ByteBuffer[] buffers) { - if (buffers.length<2) + if (buffers.length < 2) return buffers; - int b=0; - while (b callback=_callback; - Object context=_context; - _buffers=null; - _callback=null; - _context=null; - if (!_writing.compareAndSet(true,false)) - throw new ConcurrentModificationException(); - callback.completed(context); + // If updateState didn't succeed, we don't care as our buffers have been written + updateState(idleState); + currentState.complete(); } catch (IOException e) { - Callback callback=_callback; - Object context=_context; - _buffers=null; - _callback=null; - _context=null; - if (!_writing.compareAndSet(true,false)) - throw new ConcurrentModificationException(); - callback.failed(context,e); + // If updateState didn't succeed, we don't care as writing our buffers failed + updateState(failedState); + currentState.fail(e); } - return; } - /* ------------------------------------------------------------ */ - /** - * Fail the write in progress and cause any calls to get to throw - * the cause wrapped as an execution exception. - * @return true if a write was in progress - */ - public synchronized boolean failed(Throwable cause) + public void failed(Throwable cause) { - if (!_writing.compareAndSet(true,false)) - return false; - Callback callback=_callback; - Object context=_context; - _buffers=null; - _callback=null; - _context=null; - callback.failed(context,cause); - return true; + failure = cause; + State currentState = _state.get(); + logger.debug("failed: s={} e={}", _state, cause); + updateState(failedState); + currentState.fail(cause); } - /* ------------------------------------------------------------ */ - /** - * Fail the write with a {@link ClosedChannelException}. This is similar - * to a call to {@link #failed(Throwable)}, except that the exception is - * not instantiated unless a write was in progress. - * @return true if a write was in progress - */ - public synchronized boolean close() + public void close() { - if (!_writing.compareAndSet(true,false)) - return false; - Callback callback=_callback; - Object context=_context; - _buffers=null; - _callback=null; - _context=null; - callback.failed(context,new ClosedChannelException()); - return true; + failed(new ClosedChannelException()); } - /* ------------------------------------------------------------ */ - public synchronized boolean isWriting() + public boolean isWritePending() { - return _writing.get(); + return _state.get().getType() == StateType.PENDING; + } + + //TODO: remove + State getState() + { + return _state.get(); } - /* ------------------------------------------------------------ */ @Override public String toString() { - return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),isWriting(),_callback,_context); + return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get()); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusherTest.java deleted file mode 100644 index e49693d7003..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ /dev/null @@ -1,18 +0,0 @@ -//======================================================================== -//Copyright 2012 Mort Bay Consulting Pty. Ltd. -//------------------------------------------------------------------------ -//All rights reserved. This program and the accompanying materials -//are made available under the terms of the Eclipse Public License v1.0 -//and Apache License v2.0 which accompanies this distribution. -//The Eclipse Public License is available at -//http://www.eclipse.org/legal/epl-v10.html -//The Apache License v2.0 is available at -//http://www.opensource.org/licenses/apache2.0.php -//You may elect to redistribute this code under either of these licenses. -//======================================================================== -package org.eclipse.jetty.io; - -@RunWith(Mockit) -public class WriteFlusherTest -{ -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 025e7aa027c..a6e5618faff 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -109,7 +109,7 @@ public class SslConnection extends AbstractAsyncConnection _appEndPoint._readInterest.readable(); // If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read - if (_appEndPoint._writeFlusher.isWriting() && _appEndPoint._flushUnwrap) + if (_appEndPoint._writeFlusher.isWritePending() && _appEndPoint._flushUnwrap) { _appEndPoint._flushUnwrap = false; _appEndPoint._writeFlusher.completeWrite(); @@ -125,7 +125,7 @@ public class SslConnection extends AbstractAsyncConnection if (_appEndPoint._readInterest.isInterested()) _appEndPoint._readInterest.failed(cause); - if (_appEndPoint._writeFlusher.isWriting() && _appEndPoint._flushUnwrap) + if (_appEndPoint._writeFlusher.isWritePending() && _appEndPoint._flushUnwrap) { _appEndPoint._flushUnwrap = false; _appEndPoint._writeFlusher.failed(cause); @@ -141,7 +141,7 @@ public class SslConnection extends AbstractAsyncConnection hashCode(), _sslEngine.getHandshakeStatus(), _appEndPoint._readInterest.isInterested() ? "R" : "", - _appEndPoint._writeFlusher.isWriting() ? "W" : ""); + _appEndPoint._writeFlusher.isWritePending() ? "W" : ""); } /* ------------------------------------------------------------ */ @@ -183,7 +183,7 @@ public class SslConnection extends AbstractAsyncConnection _readInterest.readable(); } - if (_writeFlusher.isWriting()) + if (_writeFlusher.isWritePending()) _writeFlusher.completeWrite(); } } @@ -204,7 +204,7 @@ public class SslConnection extends AbstractAsyncConnection _readInterest.failed(x); } - if (_writeFlusher.isWriting()) + if (_writeFlusher.isWritePending()) _writeFlusher.failed(x); // TODO release all buffers??? or may in onClose @@ -640,7 +640,7 @@ public class SslConnection extends AbstractAsyncConnection @Override public String toString() { - return String.format("%s{%s%s%s}", super.toString(), _readInterest.isInterested() ? "R" : "", _writeFlusher.isWriting() ? "W" : "", _netWriting ? "w" : ""); + return String.format("%s{%s%s%s}", super.toString(), _readInterest.isInterested() ? "R" : "", _writeFlusher.isWritePending() ? "W" : "", _netWriting ? "w" : ""); } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java index 1a4702f1136..1bf3234b412 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java @@ -1,11 +1,13 @@ package org.eclipse.jetty.io; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.assertFalse; - import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritePendingException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -13,25 +15,41 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; import org.hamcrest.Matchers; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class WriteFlusherTest { + @Mock + EndPoint _endPointMock; + ByteArrayEndPoint _endp; final AtomicBoolean _flushIncomplete = new AtomicBoolean(false); WriteFlusher _flusher; final String _context = new String("Context"); - + @Before public void before() { _endp = new ByteArrayEndPoint(new byte[]{},10); _flushIncomplete.set(false); _flusher = new WriteFlusher(_endp) - { + { @Override protected void onIncompleteFlushed() { @@ -39,18 +57,12 @@ public class WriteFlusherTest } }; } - - @After - public void after() - { - - } - + @Test public void testCompleteNoBlocking() throws Exception { _endp.setGrowOutput(true); - + FutureCallback callback = new FutureCallback<>(); _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); assertTrue(callback.isDone()); @@ -58,12 +70,12 @@ public class WriteFlusherTest assertEquals(_context,callback.get()); assertEquals("How now brown cow!",_endp.takeOutputString()); } - + @Test public void testClosedNoBlocking() throws Exception { _endp.close(); - + FutureCallback callback = new FutureCallback<>(); _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); assertTrue(callback.isDone()); @@ -81,16 +93,16 @@ public class WriteFlusherTest } assertEquals("",_endp.takeOutputString()); } - + @Test public void testCompleteBlocking() throws Exception - { + { FutureCallback callback = new FutureCallback<>(); _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); - + assertTrue(_flushIncomplete.get()); try { @@ -109,7 +121,7 @@ public class WriteFlusherTest assertEquals("own cow!",_endp.takeOutputString()); assertFalse(_flushIncomplete.get()); } - + @Test public void testCloseWhileBlocking() throws Exception { @@ -118,7 +130,7 @@ public class WriteFlusherTest assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); - + assertTrue(_flushIncomplete.get()); try { @@ -157,7 +169,7 @@ public class WriteFlusherTest assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); - + assertTrue(_flushIncomplete.get()); try { @@ -169,7 +181,7 @@ public class WriteFlusherTest _flushIncomplete.set(false); } - assertEquals("How now br",_endp.takeOutputString()); + assertEquals("How now br", _endp.takeOutputString()); _flusher.failed(new IOException("Failure")); _flusher.completeWrite(); assertTrue(callback.isDone()); @@ -185,7 +197,189 @@ public class WriteFlusherTest Assert.assertTrue(cause instanceof IOException); Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure")); } - assertEquals("",_endp.takeOutputString()); + assertEquals("", _endp.takeOutputString()); + } + + @Test + public void testConcurrentAccessToWriteAndFailed() throws IOException, InterruptedException, ExecutionException + { + ExecutorService executor = Executors.newFixedThreadPool(16); + final CountDownLatch failedCalledLatch = new CountDownLatch(1); + final CountDownLatch writeCalledLatch = new CountDownLatch(1); + + final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock) + { + @Override + protected void onIncompleteFlushed() + { + } + }; + + endPointFlushExpectation(writeCalledLatch); + + executor.submit(new Writer(writeFlusher, new FutureCallback())); + assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true)); + executor.submit(new FailedCaller(writeFlusher, failedCalledLatch)).get(); + } + + @Test(expected = WritePendingException.class) + public void testConcurrentAccessToWrite() throws Throwable, InterruptedException, ExecutionException + { + ExecutorService executor = Executors.newFixedThreadPool(16); + final CountDownLatch writeCalledLatch = new CountDownLatch(2); + + final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock) + { + @Override + protected void onIncompleteFlushed() + { + } + }; + + endPointFlushExpectation(writeCalledLatch); + + executor.submit(new Writer(writeFlusher, new FutureCallback())); + try + { + executor.submit(new Writer(writeFlusher, new FutureCallback())).get(); + } + catch (ExecutionException e) + { + throw e.getCause(); + } + } + + private void endPointFlushExpectation(final CountDownLatch writeCalledLatch) throws IOException + { + // add a small delay to make concurrent access more likely + when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + Object[] arguments = invocation.getArguments(); + ByteBuffer byteBuffer = (ByteBuffer)arguments[0]; + BufferUtil.flipToFill(byteBuffer); // pretend everything has written + writeCalledLatch.countDown(); + Thread.sleep(1000); + return null; + } + }); + } + + @Test + public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException + { + ExecutorService executor = Executors.newFixedThreadPool(16); + final CountDownLatch failedCalledLatch = new CountDownLatch(1); + final CountDownLatch writeCalledLatch = new CountDownLatch(1); + final CountDownLatch completeWrite = new CountDownLatch(1); + + final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock) + { + protected void onIncompleteFlushed() + { + writeCalledLatch.countDown(); + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " onIncompleteFlushed: calling completeWrite " + writeCalledLatch.getCount()); //thomas + try + { + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " going to sleep " + getState()); + Thread.sleep(1000); + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " woken up"); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " completeWrite call"); + completeWrite(); + completeWrite.countDown(); + } + }; + + endPointFlushExpectationPendingWrite(); + + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " SUBMITTING WRITE"); + executor.submit(new Writer(writeFlusher, new FutureCallback())); + assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true)); + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " SUBMITTING FAILED " + writeFlusher.getState()); + executor.submit(new FailedCaller(writeFlusher, failedCalledLatch)); + assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true)); + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " Calling write again " + writeFlusher.getState()); + writeFlusher.write(_context, new FutureCallback(), BufferUtil.toBuffer("foobar")); + assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true)); + } + + + //TODO: combine with endPointFlushExpectation + private void endPointFlushExpectationPendingWrite() throws IOException + { + when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + Object[] arguments = invocation.getArguments(); + ByteBuffer byteBuffer = (ByteBuffer)arguments[0]; + int oldPos = byteBuffer.position(); + if (byteBuffer.remaining() == 2) + { + Thread.sleep(1000); + BufferUtil.flipToFill(byteBuffer); + } + else if (byteBuffer.remaining() == 3) + { + byteBuffer.position(1); // pretend writing one byte + return 1; + } + else + { + byteBuffer.position(byteBuffer.limit()); + } + return byteBuffer.limit() - oldPos; + } + }); + } + + private static class FailedCaller implements Callable + { + private final WriteFlusher writeFlusher; + private CountDownLatch failedCalledLatch; + + public FailedCaller(WriteFlusher writeFlusher, CountDownLatch failedCalledLatch) + { + this.writeFlusher = writeFlusher; + this.failedCalledLatch = failedCalledLatch; + } + + @Override + public FutureCallback call() + { + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " Calling writeFlusher.failed()"); + writeFlusher.failed(new IllegalStateException()); + System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " COUNTING FAILED DOWN"); + failedCalledLatch.countDown(); + return null; + } + } + + private class Writer implements Callable + { + private final WriteFlusher writeFlusher; + private FutureCallback callback; + + public Writer(WriteFlusher writeFlusher, FutureCallback callback) + { + this.writeFlusher = writeFlusher; + this.callback = callback; + } + + @Override + public FutureCallback call() + { + writeFlusher.write(_context, callback, BufferUtil.toBuffer("foo")); + return callback; + } } - } diff --git a/jetty-io/src/test/resources/jetty-logging.properties b/jetty-io/src/test/resources/jetty-logging.properties new file mode 100644 index 00000000000..5e1946805b1 --- /dev/null +++ b/jetty-io/src/test/resources/jetty-logging.properties @@ -0,0 +1,3 @@ +org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog +org.eclipse.jetty.io.LEVEL=DEBUG +#thomas diff --git a/jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties b/jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties index 5250a08562a..6e1c0df5ed8 100644 --- a/jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties +++ b/jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties @@ -1,2 +1,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.spdy.LEVEL=WARN +org.eclipse.jetty.spdy.LEVEL=DEBUG +#org.eclipse.jetty.io.LEVEL=DEBUG + +# thomas diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java index 8b5cabee7b1..d425ce3c62f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java @@ -10,6 +10,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +//TODO: Simplify, get rid of DOING. Probably replace states with AtomicBoolean public class FutureCallback implements Future,Callback { // TODO investigate use of a phasor