jetty-9: Make WriteFlusher threadsafe.

This commit is contained in:
Thomas Becker 2012-08-02 14:38:54 +02:00
parent a480e2c94d
commit 4e94601619
9 changed files with 475 additions and 188 deletions

View File

@ -85,7 +85,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWritePending())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{

View File

@ -170,7 +170,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
LOG.debug("{} idle timeout check, elapsed: {} ms, remaining: {} ms", this, idleElapsed, idleLeft);
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWritePending())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
@ -178,13 +178,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
{
LOG.debug("{} idle timeout expired", this);
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
if (isOutputShutdown())
close();
notIdle();
}
}
}

View File

@ -1,98 +1,239 @@
// ========================================================================
// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd.
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritePendingException;
import java.util.ConcurrentModificationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */
/**
* A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #onIncompleteFlushed()} is called when not all content has been
* written after a call to flush and should organise for the {@link #completeWrite()}
* method to be called when a subsequent call to flush should be able to make more progress.
*
* TODO remove synchronisation
*/
abstract public class WriteFlusher
{
private final static ByteBuffer[] NO_BUFFERS= new ByteBuffer[0];
private final AtomicBoolean _writing = new AtomicBoolean(false);
private static final Logger logger = Log.getLogger(WriteFlusher.class);
private final static ByteBuffer[] NO_BUFFERS = new ByteBuffer[0];
private final EndPoint _endp;
private ByteBuffer[] _buffers;
private Object _context;
private Callback<Object> _callback;
private final AtomicReference<State> _state = new AtomicReference<>();
private final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class); //TODO: static
private final State idleState = new IdleState(); //TODO: static all of them
private final State writingState = new WritingState();
private final State failedState = new FailedState();
private final State completingState = new CompletedState();
private volatile Throwable failure;
protected WriteFlusher(EndPoint endp)
{
_endp=endp;
_state.set(idleState);
_endp = endp;
// fill the state machine
__stateTransitions.put(StateType.IDLE, new HashSet<StateType>());
__stateTransitions.put(StateType.WRITING, new HashSet<StateType>());
__stateTransitions.put(StateType.PENDING, new HashSet<StateType>());
__stateTransitions.put(StateType.COMPLETING, new HashSet<StateType>());
__stateTransitions.put(StateType.FAILED, new HashSet<StateType>());
__stateTransitions.get(StateType.IDLE).add(StateType.WRITING);
__stateTransitions.get(StateType.WRITING).add(StateType.IDLE);
__stateTransitions.get(StateType.WRITING).add(StateType.PENDING);
__stateTransitions.get(StateType.WRITING).add(StateType.FAILED);
__stateTransitions.get(StateType.PENDING).add(StateType.IDLE);
__stateTransitions.get(StateType.PENDING).add(StateType.COMPLETING);
__stateTransitions.get(StateType.PENDING).add(StateType.FAILED);
__stateTransitions.get(StateType.COMPLETING).add(StateType.IDLE);
__stateTransitions.get(StateType.COMPLETING).add(StateType.PENDING);
__stateTransitions.get(StateType.COMPLETING).add(StateType.FAILED);
__stateTransitions.get(StateType.IDLE).add(StateType.IDLE); // TODO: should never happen?! Probably remove this transition and just throw as this indicates a bug
}
private enum State
private enum StateType
{
IDLE,
WRITING,
CLOSED
PENDING,
COMPLETING,
FAILED
}
private abstract class WriteFlusherState
private State updateState(State newState)
{
private State _state;
private ByteBuffer[] _buffers;
private Object _context;
private Callback<Object> _callback;
State currentState = _state.get();
boolean updated = false;
private WriteFlusherState(State state, ByteBuffer[] buffers, Object context, Callback<Object> callback)
while (!updated)
{
_state = state;
if(!isTransitionAllowed(newState, currentState))
return null; // return false + currentState
updated = _state.compareAndSet(currentState, newState);
logger.debug("StateType update: {} -> {} {}", currentState, newState, updated ? "" : "failed");
if (!updated)
currentState = _state.get();
}
// We need to return true and currentState
return currentState;
}
private boolean isTransitionAllowed(State newState, State currentState)
{
Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING)
{
logger.debug("WRITE PENDING EXCEPTION"); //TODO: thomas remove, we don't log and throw
throw new WritePendingException();
}
if (!allowedNewStateTypes.contains(newState.getType()))
{
logger.debug("{} -> {} not allowed.", currentState.getType(), newState.getType()); //thomas remove
return false;
}
return true;
}
private abstract class State
{
protected StateType _type;
protected ByteBuffer[] _buffers;
protected Object _context;
protected Callback<Object> _callback;
private State(StateType stateType, ByteBuffer[] buffers, Object context, Callback<Object> callback)
{
_type = stateType;
_buffers = buffers;
_context = context;
_callback = callback;
}
}
private class WriteFlusherIdleState extends WriteFlusherState
{
private WriteFlusherIdleState()
/**
* In most States this is a noop. In others it needs to be overwritten.
*
* @param cause
*/
protected void fail(Throwable cause)
{
super(null,null,null,null);
}
/**
* In most States this is a noop. In others it needs to be overwritten.
*/
protected void complete()
{
}
public StateType getType()
{
return _type;
}
public void compactBuffers()
{
this._buffers = compact(_buffers);
}
public ByteBuffer[] getBuffers()
{
return _buffers;
}
@Override
public String toString()
{
return String.format("%s", _type);
}
}
private class WriteFlusherWritingState extends WriteFlusherState
private class IdleState extends State
{
private WriteFlusherWritingState(State state, ByteBuffer[] buffers, Object context, Callback<Object> callback)
private IdleState()
{
super(state, buffers, context, callback);
super(StateType.IDLE, null, null, null);
}
}
private class WriteFlusherClosingState extends WriteFlusherState
private class WritingState extends State
{
private WriteFlusherClosingState()
private WritingState()
{
super(null,null,null,null);
super(StateType.WRITING, null, null, null);
}
}
/* ------------------------------------------------------------ */
public synchronized <C> void write(C context, Callback<C> callback, ByteBuffer... buffers)
private class FailedState extends State
{
if (callback==null)
private FailedState()
{
super(StateType.FAILED, null, null, null);
}
}
private class CompletedState extends State
{
private CompletedState()
{
super(StateType.COMPLETING, null, null, null);
}
}
private class PendingState extends State
{
private PendingState(ByteBuffer[] buffers, Object context, Callback<Object> callback)
{
super(StateType.PENDING, buffers, context, callback);
}
@Override
protected void fail(Throwable cause)
{
_callback.failed(_context, cause);
}
@Override
protected void complete()
{
_callback.completed(_context);
}
}
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers)
{
logger.debug("write: starting write. {}", _state); //thomas
if (callback == null)
throw new IllegalArgumentException();
if (!_writing.compareAndSet(false,true))
throw new WritePendingException();
if(updateState(writingState) == null)
{
callback.failed(context, failure);
return;
}
try
{
_endp.flush(buffers);
// Are we complete?
@ -100,160 +241,123 @@ abstract public class WriteFlusher
{
if (b.hasRemaining())
{
_buffers=buffers;
_context=context;
_callback=(Callback<Object>)callback;
_writing.set(true); // Needed as memory barrier
onIncompleteFlushed();
if(updateState(new PendingState(buffers, context, (Callback<Object>)callback)) == null)
callback.failed(context, failure);
else
onIncompleteFlushed();
return;
}
}
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
// If updateState didn't succeed, we don't care as our buffers have been written
updateState(idleState);
callback.completed(context);
}
catch (IOException e)
{
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException(e);
callback.failed(context,e);
// If updateState didn't succeed, we don't care as writing our buffers failed
updateState(failedState);
callback.failed(context, e);
}
}
/* ------------------------------------------------------------ */
/**
* Abstract call to be implemented by specific WriteFlushers.
* Abstract call to be implemented by specific WriteFlushers.
* It should schedule a call to {@link #completeWrite()} or
* {@link #failed(Throwable)} when appropriate.
*
* @return true if a flush can proceed.
*/
abstract protected void onIncompleteFlushed();
/* ------------------------------------------------------------ */
/* Remove empty buffers from the start of a multi buffer array
*/
private synchronized ByteBuffer[] compact(ByteBuffer[] buffers)
private ByteBuffer[] compact(ByteBuffer[] buffers)
{
if (buffers.length<2)
if (buffers.length < 2)
return buffers;
int b=0;
while (b<buffers.length && BufferUtil.isEmpty(buffers[b]))
int b = 0;
while (b < buffers.length && BufferUtil.isEmpty(buffers[b]))
b++;
if (b==0)
if (b == 0)
return buffers;
if (b==buffers.length)
if (b == buffers.length)
return NO_BUFFERS;
ByteBuffer[] compact=new ByteBuffer[buffers.length-b];
System.arraycopy(buffers,b,compact,0,compact.length);
ByteBuffer[] compact = new ByteBuffer[buffers.length - b];
System.arraycopy(buffers, b, compact, 0, compact.length);
return compact;
}
/* ------------------------------------------------------------ */
/**
* Complete a write that has not completed and that called
* {@link #onIncompleteFlushed()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)}
* is likely to be able to progress.
*/
public synchronized void completeWrite()
public void completeWrite()
{
if (!isWriting())
return; // TODO throw?
State currentState = updateState(completingState);
if (currentState == null || currentState.getType() != StateType.PENDING)
return;
try
{
while(true)
{
_buffers=compact(_buffers);
_endp.flush(_buffers);
currentState.compactBuffers(); //TODO: do we need it?
_endp.flush(currentState.getBuffers());
// Are we complete?
for (ByteBuffer b : _buffers)
// Are we complete?
for (ByteBuffer b : currentState.getBuffers())
{
if (b.hasRemaining())
{
if (b.hasRemaining())
{
if(updateState(currentState)==null)
currentState.fail(failure);
else
onIncompleteFlushed();
return;
}
return;
}
break;
}
// we are complete and ready
Callback<Object> callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.completed(context);
// If updateState didn't succeed, we don't care as our buffers have been written
updateState(idleState);
currentState.complete();
}
catch (IOException e)
{
Callback<Object> callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.failed(context,e);
// If updateState didn't succeed, we don't care as writing our buffers failed
updateState(failedState);
currentState.fail(e);
}
return;
}
/* ------------------------------------------------------------ */
/**
* Fail the write in progress and cause any calls to get to throw
* the cause wrapped as an execution exception.
* @return true if a write was in progress
*/
public synchronized boolean failed(Throwable cause)
public void failed(Throwable cause)
{
if (!_writing.compareAndSet(true,false))
return false;
Callback<Object> callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
callback.failed(context,cause);
return true;
failure = cause;
State currentState = _state.get();
logger.debug("failed: s={} e={}", _state, cause);
updateState(failedState);
currentState.fail(cause);
}
/* ------------------------------------------------------------ */
/**
* Fail the write with a {@link ClosedChannelException}. This is similar
* to a call to {@link #failed(Throwable)}, except that the exception is
* not instantiated unless a write was in progress.
* @return true if a write was in progress
*/
public synchronized boolean close()
public void close()
{
if (!_writing.compareAndSet(true,false))
return false;
Callback<Object> callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
callback.failed(context,new ClosedChannelException());
return true;
failed(new ClosedChannelException());
}
/* ------------------------------------------------------------ */
public synchronized boolean isWriting()
public boolean isWritePending()
{
return _writing.get();
return _state.get().getType() == StateType.PENDING;
}
//TODO: remove
State getState()
{
return _state.get();
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),isWriting(),_callback,_context);
return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
}
}

View File

@ -1,18 +0,0 @@
//========================================================================
//Copyright 2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.io;
@RunWith(Mockit)
public class WriteFlusherTest
{
}

View File

@ -109,7 +109,7 @@ public class SslConnection extends AbstractAsyncConnection
_appEndPoint._readInterest.readable();
// If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read
if (_appEndPoint._writeFlusher.isWriting() && _appEndPoint._flushUnwrap)
if (_appEndPoint._writeFlusher.isWritePending() && _appEndPoint._flushUnwrap)
{
_appEndPoint._flushUnwrap = false;
_appEndPoint._writeFlusher.completeWrite();
@ -125,7 +125,7 @@ public class SslConnection extends AbstractAsyncConnection
if (_appEndPoint._readInterest.isInterested())
_appEndPoint._readInterest.failed(cause);
if (_appEndPoint._writeFlusher.isWriting() && _appEndPoint._flushUnwrap)
if (_appEndPoint._writeFlusher.isWritePending() && _appEndPoint._flushUnwrap)
{
_appEndPoint._flushUnwrap = false;
_appEndPoint._writeFlusher.failed(cause);
@ -141,7 +141,7 @@ public class SslConnection extends AbstractAsyncConnection
hashCode(),
_sslEngine.getHandshakeStatus(),
_appEndPoint._readInterest.isInterested() ? "R" : "",
_appEndPoint._writeFlusher.isWriting() ? "W" : "");
_appEndPoint._writeFlusher.isWritePending() ? "W" : "");
}
/* ------------------------------------------------------------ */
@ -183,7 +183,7 @@ public class SslConnection extends AbstractAsyncConnection
_readInterest.readable();
}
if (_writeFlusher.isWriting())
if (_writeFlusher.isWritePending())
_writeFlusher.completeWrite();
}
}
@ -204,7 +204,7 @@ public class SslConnection extends AbstractAsyncConnection
_readInterest.failed(x);
}
if (_writeFlusher.isWriting())
if (_writeFlusher.isWritePending())
_writeFlusher.failed(x);
// TODO release all buffers??? or may in onClose
@ -640,7 +640,7 @@ public class SslConnection extends AbstractAsyncConnection
@Override
public String toString()
{
return String.format("%s{%s%s%s}", super.toString(), _readInterest.isInterested() ? "R" : "", _writeFlusher.isWriting() ? "W" : "", _netWriting ? "w" : "");
return String.format("%s{%s%s%s}", super.toString(), _readInterest.isInterested() ? "R" : "", _writeFlusher.isWritePending() ? "W" : "", _netWriting ? "w" : "");
}
}

View File

@ -1,11 +1,13 @@
package org.eclipse.jetty.io;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.assertFalse;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -13,25 +15,41 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class WriteFlusherTest
{
@Mock
EndPoint _endPointMock;
ByteArrayEndPoint _endp;
final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
WriteFlusher _flusher;
final String _context = new String("Context");
@Before
public void before()
{
_endp = new ByteArrayEndPoint(new byte[]{},10);
_flushIncomplete.set(false);
_flusher = new WriteFlusher(_endp)
{
{
@Override
protected void onIncompleteFlushed()
{
@ -39,18 +57,12 @@ public class WriteFlusherTest
}
};
}
@After
public void after()
{
}
@Test
public void testCompleteNoBlocking() throws Exception
{
_endp.setGrowOutput(true);
FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertTrue(callback.isDone());
@ -58,12 +70,12 @@ public class WriteFlusherTest
assertEquals(_context,callback.get());
assertEquals("How now brown cow!",_endp.takeOutputString());
}
@Test
public void testClosedNoBlocking() throws Exception
{
_endp.close();
FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertTrue(callback.isDone());
@ -81,16 +93,16 @@ public class WriteFlusherTest
}
assertEquals("",_endp.takeOutputString());
}
@Test
public void testCompleteBlocking() throws Exception
{
{
FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
assertTrue(_flushIncomplete.get());
try
{
@ -109,7 +121,7 @@ public class WriteFlusherTest
assertEquals("own cow!",_endp.takeOutputString());
assertFalse(_flushIncomplete.get());
}
@Test
public void testCloseWhileBlocking() throws Exception
{
@ -118,7 +130,7 @@ public class WriteFlusherTest
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
assertTrue(_flushIncomplete.get());
try
{
@ -157,7 +169,7 @@ public class WriteFlusherTest
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
assertTrue(_flushIncomplete.get());
try
{
@ -169,7 +181,7 @@ public class WriteFlusherTest
_flushIncomplete.set(false);
}
assertEquals("How now br",_endp.takeOutputString());
assertEquals("How now br", _endp.takeOutputString());
_flusher.failed(new IOException("Failure"));
_flusher.completeWrite();
assertTrue(callback.isDone());
@ -185,7 +197,189 @@ public class WriteFlusherTest
Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure"));
}
assertEquals("",_endp.takeOutputString());
assertEquals("", _endp.takeOutputString());
}
@Test
public void testConcurrentAccessToWriteAndFailed() throws IOException, InterruptedException, ExecutionException
{
ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
@Override
protected void onIncompleteFlushed()
{
}
};
endPointFlushExpectation(writeCalledLatch);
executor.submit(new Writer(writeFlusher, new FutureCallback()));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
executor.submit(new FailedCaller(writeFlusher, failedCalledLatch)).get();
}
@Test(expected = WritePendingException.class)
public void testConcurrentAccessToWrite() throws Throwable, InterruptedException, ExecutionException
{
ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch writeCalledLatch = new CountDownLatch(2);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
@Override
protected void onIncompleteFlushed()
{
}
};
endPointFlushExpectation(writeCalledLatch);
executor.submit(new Writer(writeFlusher, new FutureCallback()));
try
{
executor.submit(new Writer(writeFlusher, new FutureCallback())).get();
}
catch (ExecutionException e)
{
throw e.getCause();
}
}
private void endPointFlushExpectation(final CountDownLatch writeCalledLatch) throws IOException
{
// add a small delay to make concurrent access more likely
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
{
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
Object[] arguments = invocation.getArguments();
ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
BufferUtil.flipToFill(byteBuffer); // pretend everything has written
writeCalledLatch.countDown();
Thread.sleep(1000);
return null;
}
});
}
@Test
public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException
{
ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch completeWrite = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
protected void onIncompleteFlushed()
{
writeCalledLatch.countDown();
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " onIncompleteFlushed: calling completeWrite " + writeCalledLatch.getCount()); //thomas
try
{
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " going to sleep " + getState());
Thread.sleep(1000);
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " woken up");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " completeWrite call");
completeWrite();
completeWrite.countDown();
}
};
endPointFlushExpectationPendingWrite();
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " SUBMITTING WRITE");
executor.submit(new Writer(writeFlusher, new FutureCallback()));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " SUBMITTING FAILED " + writeFlusher.getState());
executor.submit(new FailedCaller(writeFlusher, failedCalledLatch));
assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " Calling write again " + writeFlusher.getState());
writeFlusher.write(_context, new FutureCallback<String>(), BufferUtil.toBuffer("foobar"));
assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true));
}
//TODO: combine with endPointFlushExpectation
private void endPointFlushExpectationPendingWrite() throws IOException
{
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
{
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
Object[] arguments = invocation.getArguments();
ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
int oldPos = byteBuffer.position();
if (byteBuffer.remaining() == 2)
{
Thread.sleep(1000);
BufferUtil.flipToFill(byteBuffer);
}
else if (byteBuffer.remaining() == 3)
{
byteBuffer.position(1); // pretend writing one byte
return 1;
}
else
{
byteBuffer.position(byteBuffer.limit());
}
return byteBuffer.limit() - oldPos;
}
});
}
private static class FailedCaller implements Callable
{
private final WriteFlusher writeFlusher;
private CountDownLatch failedCalledLatch;
public FailedCaller(WriteFlusher writeFlusher, CountDownLatch failedCalledLatch)
{
this.writeFlusher = writeFlusher;
this.failedCalledLatch = failedCalledLatch;
}
@Override
public FutureCallback call()
{
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " Calling writeFlusher.failed()");
writeFlusher.failed(new IllegalStateException());
System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " COUNTING FAILED DOWN");
failedCalledLatch.countDown();
return null;
}
}
private class Writer implements Callable
{
private final WriteFlusher writeFlusher;
private FutureCallback<String> callback;
public Writer(WriteFlusher writeFlusher, FutureCallback callback)
{
this.writeFlusher = writeFlusher;
this.callback = callback;
}
@Override
public FutureCallback call()
{
writeFlusher.write(_context, callback, BufferUtil.toBuffer("foo"));
return callback;
}
}
}

View File

@ -0,0 +1,3 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.io.LEVEL=DEBUG
#thomas

View File

@ -1,2 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.spdy.LEVEL=WARN
org.eclipse.jetty.spdy.LEVEL=DEBUG
#org.eclipse.jetty.io.LEVEL=DEBUG
# thomas

View File

@ -10,6 +10,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
//TODO: Simplify, get rid of DOING. Probably replace states with AtomicBoolean
public class FutureCallback<C> implements Future<C>,Callback<C>
{
// TODO investigate use of a phasor