jetty9 - WriteFlusher minor changes. Some ConcurrentTests for WriteFlusher added. Cleanup. Javadoc.

This commit is contained in:
Thomas Becker 2012-08-03 13:57:17 +02:00
parent 2d7b6c9c06
commit 9f2d1586ca
2 changed files with 115 additions and 114 deletions

View File

@ -62,11 +62,10 @@ abstract public class WriteFlusher
// IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE // IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
// //
// If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure. // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
// If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
// the callback's complete or respectively failed methods will be called.
// If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
// Otherwise if a fail happens, the state is set to FAIL, so that a subsequent attempt to move out of WRITING or COMPLETING //
// will discover the failure and call the callbacks before returning to IDLE
// Thus the possible paths for a failure are:
//
// IDLE--(fail)-->IDLE // IDLE--(fail)-->IDLE
// IDLE-->WRITING--(fail)-->FAILED-->IDLE // IDLE-->WRITING--(fail)-->FAILED-->IDLE
// IDLE-->WRITING-->PENDING--(fail)-->IDLE // IDLE-->WRITING-->PENDING--(fail)-->IDLE
@ -94,7 +93,8 @@ abstract public class WriteFlusher
/** /**
* Tries to update the current state to the given new state. * Tries to update the current state to the given new state.
* @param nextState the desired new state * @param previous the expected current state
* @param next the desired new state
* @return the previous state or null if the state transition failed * @return the previous state or null if the state transition failed
* @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error) * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
*/ */
@ -135,10 +135,6 @@ abstract public class WriteFlusher
private boolean isTransitionAllowed(State currentState, State newState) private boolean isTransitionAllowed(State currentState, State newState)
{ {
Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType()); Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING)
{
throw new WritePendingException();
}
if (!allowedNewStateTypes.contains(newState.getType())) if (!allowedNewStateTypes.contains(newState.getType()))
{ {
LOG.debug("StateType update: {} -> {} not allowed", currentState, newState); LOG.debug("StateType update: {} -> {} not allowed", currentState, newState);
@ -288,7 +284,7 @@ abstract public class WriteFlusher
if (!updateState(__IDLE,__WRITING)) if (!updateState(__IDLE,__WRITING))
throw new WritePendingException(); throw new WritePendingException();
try try
{ {
_endPoint.flush(buffers); _endPoint.flush(buffers);
@ -333,12 +329,11 @@ abstract public class WriteFlusher
public void completeWrite() public void completeWrite()
{ {
State previous = _state.get(); State previous = _state.get();
PendingState<?> pending=null;
if (previous.getType()!=StateType.PENDING) if (previous.getType()!=StateType.PENDING)
return; // failure already handled. return; // failure already handled.
pending=(PendingState<?>)previous; PendingState<?> pending = (PendingState<?>)previous;
if (!updateState(pending,__COMPLETING)) if (!updateState(pending,__COMPLETING))
return; // failure already handled. return; // failure already handled.
@ -412,7 +407,7 @@ abstract public class WriteFlusher
onFail(new ClosedChannelException()); onFail(new ClosedChannelException());
} }
public boolean isIdle() boolean isIdle()
{ {
return _state.get().getType() == StateType.IDLE; return _state.get().getType() == StateType.IDLE;
} }

View File

@ -1,6 +1,5 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import static junit.framework.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
@ -21,7 +20,6 @@ import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -47,13 +45,14 @@ public class WriteFlusherTest
private WriteFlusher _flusher; private WriteFlusher _flusher;
private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false); private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
private final String _context = new String("Context"); private final String _context = "Context";
private final ExecutorService executor = Executors.newFixedThreadPool(16);
private ByteArrayEndPoint _endp; private ByteArrayEndPoint _endp;
@Before @Before
public void before() public void before()
{ {
_endp = new ByteArrayEndPoint(new byte[]{},10); _endp = new ByteArrayEndPoint(new byte[]{}, 10);
_flushIncomplete.set(false); _flushIncomplete.set(false);
_flusher = new WriteFlusher(_endp) _flusher = new WriteFlusher(_endp)
{ {
@ -72,7 +71,7 @@ public class WriteFlusherTest
FutureCallback<String> callback = new FutureCallback<>(); FutureCallback<String> callback = new FutureCallback<>();
_flusher.onFail(new IOException("Ignored because no operation in progress")); _flusher.onFail(new IOException("Ignored because no operation in progress"));
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback); assertCallbackIsDone(callback);
assertFlushIsComplete(); assertFlushIsComplete();
assertThat("context and callback.get() are equal", _context, equalTo(callback.get())); assertThat("context and callback.get() are equal", _context, equalTo(callback.get()));
@ -80,14 +79,14 @@ public class WriteFlusherTest
equalTo(_endp.takeOutputString())); equalTo(_endp.takeOutputString()));
assertTrue(_flusher.isIdle()); assertTrue(_flusher.isIdle());
} }
@Test @Test
public void testCompleteNoBlocking() throws Exception public void testCompleteNoBlocking() throws Exception
{ {
_endp.setGrowOutput(true); _endp.setGrowOutput(true);
FutureCallback<String> callback = new FutureCallback<>(); FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback); assertCallbackIsDone(callback);
assertFlushIsComplete(); assertFlushIsComplete();
assertThat("context and callback.get() are equal", _context, equalTo(callback.get())); assertThat("context and callback.get() are equal", _context, equalTo(callback.get()));
@ -112,21 +111,21 @@ public class WriteFlusherTest
_endp.close(); _endp.close();
FutureCallback<String> callback = new FutureCallback<>(); FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback); assertCallbackIsDone(callback);
assertFlushIsComplete(); assertFlushIsComplete();
try try
{ {
assertEquals(_context,callback.get()); assertEquals(_context, callback.get());
Assert.fail(); Assert.fail();
} }
catch(ExecutionException e) catch (ExecutionException e)
{ {
Throwable cause = e.getCause(); Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException); Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED")); Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
} }
assertEquals("",_endp.takeOutputString()); assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle()); assertTrue(_flusher.isIdle());
} }
@ -135,14 +134,14 @@ public class WriteFlusherTest
public void testCompleteBlocking() throws Exception public void testCompleteBlocking() throws Exception
{ {
FutureCallback<String> callback = new FutureCallback<>(); FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone()); assertFalse(callback.isDone());
assertFalse(callback.isCancelled()); assertFalse(callback.isCancelled());
assertTrue(_flushIncomplete.get()); assertTrue(_flushIncomplete.get());
try try
{ {
assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
Assert.fail(); Assert.fail();
} }
catch (TimeoutException to) catch (TimeoutException to)
@ -150,11 +149,11 @@ public class WriteFlusherTest
_flushIncomplete.set(false); _flushIncomplete.set(false);
} }
assertEquals("How now br",_endp.takeOutputString()); assertEquals("How now br", _endp.takeOutputString());
_flusher.completeWrite(); _flusher.completeWrite();
assertCallbackIsDone(callback); assertCallbackIsDone(callback);
assertEquals(_context,callback.get()); assertEquals(_context, callback.get());
assertEquals("own cow!",_endp.takeOutputString()); assertEquals("own cow!", _endp.takeOutputString());
assertFlushIsComplete(); assertFlushIsComplete();
assertTrue(_flusher.isIdle()); assertTrue(_flusher.isIdle());
} }
@ -163,7 +162,7 @@ public class WriteFlusherTest
public void testCloseWhileBlocking() throws Exception public void testCloseWhileBlocking() throws Exception
{ {
FutureCallback<String> callback = new FutureCallback<>(); FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone()); assertFalse(callback.isDone());
assertFalse(callback.isCancelled()); assertFalse(callback.isCancelled());
@ -171,7 +170,7 @@ public class WriteFlusherTest
assertTrue(_flushIncomplete.get()); assertTrue(_flushIncomplete.get());
try try
{ {
assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
Assert.fail(); Assert.fail();
} }
catch (TimeoutException to) catch (TimeoutException to)
@ -179,23 +178,23 @@ public class WriteFlusherTest
_flushIncomplete.set(false); _flushIncomplete.set(false);
} }
assertEquals("How now br",_endp.takeOutputString()); assertEquals("How now br", _endp.takeOutputString());
_endp.close(); _endp.close();
_flusher.completeWrite(); _flusher.completeWrite();
assertCallbackIsDone(callback); assertCallbackIsDone(callback);
assertFlushIsComplete(); assertFlushIsComplete();
try try
{ {
assertEquals(_context,callback.get()); assertEquals(_context, callback.get());
Assert.fail(); Assert.fail();
} }
catch(ExecutionException e) catch (ExecutionException e)
{ {
Throwable cause = e.getCause(); Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException); Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED")); Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
} }
assertEquals("",_endp.takeOutputString()); assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle()); assertTrue(_flusher.isIdle());
} }
@ -203,7 +202,7 @@ public class WriteFlusherTest
public void testFailWhileBlocking() throws Exception public void testFailWhileBlocking() throws Exception
{ {
FutureCallback<String> callback = new FutureCallback<>(); FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone()); assertFalse(callback.isDone());
assertFalse(callback.isCancelled()); assertFalse(callback.isCancelled());
@ -211,7 +210,7 @@ public class WriteFlusherTest
assertTrue(_flushIncomplete.get()); assertTrue(_flushIncomplete.get());
try try
{ {
assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
Assert.fail(); Assert.fail();
} }
catch (TimeoutException to) catch (TimeoutException to)
@ -226,17 +225,17 @@ public class WriteFlusherTest
assertFlushIsComplete(); assertFlushIsComplete();
try try
{ {
assertEquals(_context,callback.get()); assertEquals(_context, callback.get());
Assert.fail(); Assert.fail();
} }
catch(ExecutionException e) catch (ExecutionException e)
{ {
Throwable cause = e.getCause(); Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException); Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure")); Assert.assertThat(cause.getMessage(), Matchers.containsString("Failure"));
} }
assertEquals("", _endp.takeOutputString()); assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle()); assertTrue(_flusher.isIdle());
} }
@ -245,29 +244,29 @@ public class WriteFlusherTest
final ByteArrayEndPoint _endp; final ByteArrayEndPoint _endp;
final SecureRandom _random; final SecureRandom _random;
final ScheduledThreadPoolExecutor _scheduler; final ScheduledThreadPoolExecutor _scheduler;
final StringBuilder _content=new StringBuilder(); final StringBuilder _content = new StringBuilder();
ConcurrentFlusher(ByteArrayEndPoint endp,SecureRandom random, ScheduledThreadPoolExecutor scheduler) ConcurrentFlusher(ByteArrayEndPoint endp, SecureRandom random, ScheduledThreadPoolExecutor scheduler)
{ {
super(endp); super(endp);
_endp=endp; _endp = endp;
_random=random; _random = random;
_scheduler=scheduler; _scheduler = scheduler;
} }
@Override @Override
protected void onIncompleteFlushed() protected void onIncompleteFlushed()
{ {
_scheduler.schedule(this,1+_random.nextInt(9),TimeUnit.MILLISECONDS); _scheduler.schedule(this, 1 + _random.nextInt(9), TimeUnit.MILLISECONDS);
} }
@Override @Override
public synchronized void run() public synchronized void run()
{ {
_content.append(_endp.takeOutputString()); _content.append(_endp.takeOutputString());
completeWrite(); completeWrite();
} }
@Override @Override
public synchronized String toString() public synchronized String toString()
{ {
@ -275,25 +274,25 @@ public class WriteFlusherTest
return _content.toString(); return _content.toString();
} }
} }
@Test @Test
public void testConcurrent() throws Exception public void testConcurrent() throws Exception
{ {
final SecureRandom random = new SecureRandom(); final SecureRandom random = new SecureRandom();
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100); final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100);
ConcurrentFlusher[] flushers = new ConcurrentFlusher[50000]; ConcurrentFlusher[] flushers = new ConcurrentFlusher[50000];
FutureCallback<?>[] futures = new FutureCallback<?>[flushers.length]; FutureCallback<?>[] futures = new FutureCallback<?>[flushers.length];
for (int i=0;i<flushers.length;i++) for (int i = 0; i < flushers.length; i++)
{ {
int size=5+random.nextInt(15); int size = 5 + random.nextInt(15);
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[]{},size); ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[]{}, size);
final ConcurrentFlusher flusher = new ConcurrentFlusher(endp,random,scheduler); final ConcurrentFlusher flusher = new ConcurrentFlusher(endp, random, scheduler);
flushers[i]=flusher; flushers[i] = flusher;
final FutureCallback<String> callback = new FutureCallback<>(); final FutureCallback<String> callback = new FutureCallback<>();
futures[i]=callback; futures[i] = callback;
scheduler.schedule(new Runnable() scheduler.schedule(new Runnable()
{ {
@Override @Override
@ -302,39 +301,37 @@ public class WriteFlusherTest
flusher.onFail(new Throwable("THE CAUSE")); flusher.onFail(new Throwable("THE CAUSE"));
} }
} }
,random.nextInt(75)+1,TimeUnit.MILLISECONDS); , random.nextInt(75) + 1, TimeUnit.MILLISECONDS);
flusher.write(_context,callback,BufferUtil.toBuffer("How Now Brown Cow."),BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!")); flusher.write(_context, callback, BufferUtil.toBuffer("How Now Brown Cow."), BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!"));
} }
int completed=0; int completed = 0;
int failed=0; int failed = 0;
for (int i=0;i<flushers.length;i++) for (int i = 0; i < flushers.length; i++)
{ {
try try
{ {
futures[i].get(); futures[i].get();
assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!",flushers[i].toString()); assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!", flushers[i].toString());
completed++; completed++;
} }
catch (Exception e) catch (Exception e)
{ {
assertThat(e.getMessage(),Matchers.containsString("THE CAUSE")); assertThat(e.getMessage(), Matchers.containsString("THE CAUSE"));
failed++; failed++;
} }
} }
assertThat(completed,Matchers.greaterThan(0)); assertThat(completed, Matchers.greaterThan(0));
assertThat(failed,Matchers.greaterThan(0)); assertThat(failed, Matchers.greaterThan(0));
scheduler.shutdown(); scheduler.shutdown();
} }
@Test @Test
@Ignore
public void testConcurrentAccessToWriteAndFailed() throws IOException, InterruptedException, ExecutionException public void testConcurrentAccessToWriteAndFailed() throws IOException, InterruptedException, ExecutionException
{ {
ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch failedCalledLatch = new CountDownLatch(1); final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1); final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCompleteLatch = new CountDownLatch(1); final CountDownLatch writeCompleteLatch = new CountDownLatch(1);
@ -400,11 +397,10 @@ public class WriteFlusherTest
} }
} }
@Ignore("Intermittent failures.") //TODO: fixme
@Test(expected = WritePendingException.class) @Test(expected = WritePendingException.class)
public void testConcurrentAccessToWrite() throws Throwable public void testConcurrentAccessToWrite() throws Throwable
{ {
ExecutorService executor = Executors.newFixedThreadPool(16); final CountDownLatch flushCalledLatch = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock) final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{ {
@ -420,13 +416,16 @@ public class WriteFlusherTest
@Override @Override
public Object answer(InvocationOnMock invocation) throws Throwable public Object answer(InvocationOnMock invocation) throws Throwable
{ {
flushCalledLatch.countDown();
// make sure we stay here, so write is called twice at the same time // make sure we stay here, so write is called twice at the same time
Thread.sleep(5000); Thread.sleep(5000);
return null; return null;
} }
}); });
executor.submit(new Writer(writeFlusher, new FutureCallback())); executor.submit(new Writer(writeFlusher, new FutureCallback<String>()));
// make sure that we call .get() on the write that executed second by waiting on this latch
assertThat("Flush has been called once", flushCalledLatch.await(5, TimeUnit.SECONDS), is(true));
try try
{ {
executor.submit(new Writer(writeFlusher, new FutureCallback())).get(); executor.submit(new Writer(writeFlusher, new FutureCallback())).get();
@ -459,16 +458,14 @@ public class WriteFlusherTest
} }
@Test @Test
@Ignore public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException
public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException
{ {
ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch failedCalledLatch = new CountDownLatch(1); final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch onIncompleteFlushedCalledLatch = new CountDownLatch(1); final CountDownLatch onIncompleteFlushedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1); final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch completeWrite = new CountDownLatch(1); final CountDownLatch completeWrite = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock) final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch))
{ {
protected void onIncompleteFlushed() protected void onIncompleteFlushed()
{ {
@ -478,8 +475,6 @@ public class WriteFlusherTest
} }
}; };
endPointFlushExpectationPendingWrite(writeCalledLatch, failedCalledLatch);
ExposingStateCallback callback = new ExposingStateCallback(); ExposingStateCallback callback = new ExposingStateCallback();
executor.submit(new Writer(writeFlusher, callback)); executor.submit(new Writer(writeFlusher, callback));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true)); assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
@ -487,44 +482,55 @@ public class WriteFlusherTest
assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true)); assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
writeFlusher.write(_context, new FutureCallback<String>(), BufferUtil.toBuffer("foobar")); writeFlusher.write(_context, new FutureCallback<String>(), BufferUtil.toBuffer("foobar"));
assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true)); assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true));
callback.get(5, TimeUnit.SECONDS);
assertThat("callback failed has not been called", callback.isFailed(), is(false));
assertThat("callback complete has been called", callback.isCompleted(), is(true));
} }
private static class EndPointMock extends ByteArrayEndPoint
//TODO: combine with endPointFlushExpectation
private void endPointFlushExpectationPendingWrite(final CountDownLatch writeCalledLatch, final CountDownLatch
failedCalledLatch)
throws
IOException
{ {
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>() private final CountDownLatch writeCalledLatch;
private final CountDownLatch failedCalledLatch;
public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
{ {
@Override this.writeCalledLatch = writeCalledLatch;
public Object answer(InvocationOnMock invocation) throws Throwable this.failedCalledLatch = failedCalledLatch;
}
@Override
public int flush(ByteBuffer... buffers) throws IOException
{
writeCalledLatch.countDown();
ByteBuffer byteBuffer = buffers[0];
int oldPos = byteBuffer.position();
if (byteBuffer.remaining() == 2)
{ {
writeCalledLatch.countDown(); // make sure failed is called before we go on
Object[] arguments = invocation.getArguments(); try
ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
int oldPos = byteBuffer.position();
if (byteBuffer.remaining() == 2)
{ {
// make sure failed is called before we go on
failedCalledLatch.await(5, TimeUnit.SECONDS); failedCalledLatch.await(5, TimeUnit.SECONDS);
BufferUtil.flipToFill(byteBuffer);
} }
else if (byteBuffer.remaining() == 3) catch (InterruptedException e)
{ {
byteBuffer.position(1); // pretend writing one byte e.printStackTrace();
return 1;
} }
else BufferUtil.flipToFill(byteBuffer);
{
byteBuffer.position(byteBuffer.limit());
}
return byteBuffer.limit() - oldPos;
} }
}); else if (byteBuffer.remaining() == 3)
{
byteBuffer.position(1); // pretend writing one byte
return 1;
}
else
{
byteBuffer.position(byteBuffer.limit());
}
return byteBuffer.limit() - oldPos;
}
} }
private static class FailedCaller implements Callable private static class FailedCaller implements Callable
{ {
private final WriteFlusher writeFlusher; private final WriteFlusher writeFlusher;
@ -550,7 +556,7 @@ public class WriteFlusherTest
private final WriteFlusher writeFlusher; private final WriteFlusher writeFlusher;
private FutureCallback<String> callback; private FutureCallback<String> callback;
public Writer(WriteFlusher writeFlusher, FutureCallback callback) public Writer(WriteFlusher writeFlusher, FutureCallback<String> callback)
{ {
this.writeFlusher = writeFlusher; this.writeFlusher = writeFlusher;
this.callback = callback; this.callback = callback;