Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9
This commit is contained in:
commit
a18e28d767
|
@ -13,19 +13,13 @@
|
|||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.CancelledKeyException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
|
@ -200,6 +194,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
|
|||
{
|
||||
keyString += "!";
|
||||
}
|
||||
return String.format("%s{io=%d,k=%s}",_interestOps, keyString);
|
||||
return String.format("%s{io=%d,k=%s}",super.toString(), _interestOps, keyString);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,11 +62,10 @@ abstract public class WriteFlusher
|
|||
// IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
|
||||
//
|
||||
// If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
|
||||
// If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
|
||||
// the callback's complete or respectively failed methods will be called.
|
||||
// If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
|
||||
// Otherwise if a fail happens, the state is set to FAIL, so that a subsequent attempt to move out of WRITING or COMPLETING
|
||||
// will discover the failure and call the callbacks before returning to IDLE
|
||||
// Thus the possible paths for a failure are:
|
||||
//
|
||||
//
|
||||
// IDLE--(fail)-->IDLE
|
||||
// IDLE-->WRITING--(fail)-->FAILED-->IDLE
|
||||
// IDLE-->WRITING-->PENDING--(fail)-->IDLE
|
||||
|
@ -94,7 +93,8 @@ abstract public class WriteFlusher
|
|||
|
||||
/**
|
||||
* Tries to update the current state to the given new state.
|
||||
* @param nextState the desired new state
|
||||
* @param previous the expected current state
|
||||
* @param next the desired new state
|
||||
* @return the previous state or null if the state transition failed
|
||||
* @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
|
||||
*/
|
||||
|
@ -135,10 +135,6 @@ abstract public class WriteFlusher
|
|||
private boolean isTransitionAllowed(State currentState, State newState)
|
||||
{
|
||||
Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
|
||||
if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING)
|
||||
{
|
||||
throw new WritePendingException();
|
||||
}
|
||||
if (!allowedNewStateTypes.contains(newState.getType()))
|
||||
{
|
||||
LOG.debug("StateType update: {} -> {} not allowed", currentState, newState);
|
||||
|
@ -234,7 +230,7 @@ abstract public class WriteFlusher
|
|||
{
|
||||
private final C _context;
|
||||
private final Callback<C> _callback;
|
||||
private ByteBuffer[] _buffers;
|
||||
private final ByteBuffer[] _buffers;
|
||||
|
||||
private PendingState(ByteBuffer[] buffers, C context, Callback<C> callback)
|
||||
{
|
||||
|
@ -288,7 +284,7 @@ abstract public class WriteFlusher
|
|||
|
||||
if (!updateState(__IDLE,__WRITING))
|
||||
throw new WritePendingException();
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
_endPoint.flush(buffers);
|
||||
|
@ -333,12 +329,11 @@ abstract public class WriteFlusher
|
|||
public void completeWrite()
|
||||
{
|
||||
State previous = _state.get();
|
||||
PendingState<?> pending=null;
|
||||
|
||||
|
||||
if (previous.getType()!=StateType.PENDING)
|
||||
return; // failure already handled.
|
||||
|
||||
pending=(PendingState<?>)previous;
|
||||
PendingState<?> pending = (PendingState<?>)previous;
|
||||
if (!updateState(pending,__COMPLETING))
|
||||
return; // failure already handled.
|
||||
|
||||
|
@ -412,7 +407,7 @@ abstract public class WriteFlusher
|
|||
onFail(new ClosedChannelException());
|
||||
}
|
||||
|
||||
public boolean isIdle()
|
||||
boolean isIdle()
|
||||
{
|
||||
return _state.get().getType() == StateType.IDLE;
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package org.eclipse.jetty.io;
|
||||
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.WritePendingException;
|
||||
|
@ -21,7 +20,6 @@ import org.eclipse.jetty.util.FutureCallback;
|
|||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
|
@ -47,13 +45,14 @@ public class WriteFlusherTest
|
|||
private WriteFlusher _flusher;
|
||||
|
||||
private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
|
||||
private final String _context = new String("Context");
|
||||
private final String _context = "Context";
|
||||
private final ExecutorService executor = Executors.newFixedThreadPool(16);
|
||||
private ByteArrayEndPoint _endp;
|
||||
|
||||
@Before
|
||||
public void before()
|
||||
{
|
||||
_endp = new ByteArrayEndPoint(new byte[]{},10);
|
||||
_endp = new ByteArrayEndPoint(new byte[]{}, 10);
|
||||
_flushIncomplete.set(false);
|
||||
_flusher = new WriteFlusher(_endp)
|
||||
{
|
||||
|
@ -72,7 +71,7 @@ public class WriteFlusherTest
|
|||
|
||||
FutureCallback<String> callback = new FutureCallback<>();
|
||||
_flusher.onFail(new IOException("Ignored because no operation in progress"));
|
||||
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
|
||||
_flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
|
||||
assertCallbackIsDone(callback);
|
||||
assertFlushIsComplete();
|
||||
assertThat("context and callback.get() are equal", _context, equalTo(callback.get()));
|
||||
|
@ -80,14 +79,14 @@ public class WriteFlusherTest
|
|||
equalTo(_endp.takeOutputString()));
|
||||
assertTrue(_flusher.isIdle());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCompleteNoBlocking() throws Exception
|
||||
{
|
||||
_endp.setGrowOutput(true);
|
||||
|
||||
FutureCallback<String> callback = new FutureCallback<>();
|
||||
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
|
||||
_flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
|
||||
assertCallbackIsDone(callback);
|
||||
assertFlushIsComplete();
|
||||
assertThat("context and callback.get() are equal", _context, equalTo(callback.get()));
|
||||
|
@ -112,21 +111,21 @@ public class WriteFlusherTest
|
|||
_endp.close();
|
||||
|
||||
FutureCallback<String> callback = new FutureCallback<>();
|
||||
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
|
||||
_flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
|
||||
assertCallbackIsDone(callback);
|
||||
assertFlushIsComplete();
|
||||
try
|
||||
{
|
||||
assertEquals(_context,callback.get());
|
||||
assertEquals(_context, callback.get());
|
||||
Assert.fail();
|
||||
}
|
||||
catch(ExecutionException e)
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
Throwable cause = e.getCause();
|
||||
Assert.assertTrue(cause instanceof IOException);
|
||||
Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED"));
|
||||
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
|
||||
}
|
||||
assertEquals("",_endp.takeOutputString());
|
||||
assertEquals("", _endp.takeOutputString());
|
||||
assertTrue(_flusher.isIdle());
|
||||
}
|
||||
|
||||
|
@ -135,14 +134,14 @@ public class WriteFlusherTest
|
|||
public void testCompleteBlocking() throws Exception
|
||||
{
|
||||
FutureCallback<String> callback = new FutureCallback<>();
|
||||
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
|
||||
_flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
|
||||
assertFalse(callback.isDone());
|
||||
assertFalse(callback.isCancelled());
|
||||
|
||||
assertTrue(_flushIncomplete.get());
|
||||
try
|
||||
{
|
||||
assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS));
|
||||
assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
|
||||
Assert.fail();
|
||||
}
|
||||
catch (TimeoutException to)
|
||||
|
@ -150,11 +149,11 @@ public class WriteFlusherTest
|
|||
_flushIncomplete.set(false);
|
||||
}
|
||||
|
||||
assertEquals("How now br",_endp.takeOutputString());
|
||||
assertEquals("How now br", _endp.takeOutputString());
|
||||
_flusher.completeWrite();
|
||||
assertCallbackIsDone(callback);
|
||||
assertEquals(_context,callback.get());
|
||||
assertEquals("own cow!",_endp.takeOutputString());
|
||||
assertEquals(_context, callback.get());
|
||||
assertEquals("own cow!", _endp.takeOutputString());
|
||||
assertFlushIsComplete();
|
||||
assertTrue(_flusher.isIdle());
|
||||
}
|
||||
|
@ -163,7 +162,7 @@ public class WriteFlusherTest
|
|||
public void testCloseWhileBlocking() throws Exception
|
||||
{
|
||||
FutureCallback<String> callback = new FutureCallback<>();
|
||||
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
|
||||
_flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
|
||||
|
||||
assertFalse(callback.isDone());
|
||||
assertFalse(callback.isCancelled());
|
||||
|
@ -171,7 +170,7 @@ public class WriteFlusherTest
|
|||
assertTrue(_flushIncomplete.get());
|
||||
try
|
||||
{
|
||||
assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS));
|
||||
assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
|
||||
Assert.fail();
|
||||
}
|
||||
catch (TimeoutException to)
|
||||
|
@ -179,23 +178,23 @@ public class WriteFlusherTest
|
|||
_flushIncomplete.set(false);
|
||||
}
|
||||
|
||||
assertEquals("How now br",_endp.takeOutputString());
|
||||
assertEquals("How now br", _endp.takeOutputString());
|
||||
_endp.close();
|
||||
_flusher.completeWrite();
|
||||
assertCallbackIsDone(callback);
|
||||
assertFlushIsComplete();
|
||||
try
|
||||
{
|
||||
assertEquals(_context,callback.get());
|
||||
assertEquals(_context, callback.get());
|
||||
Assert.fail();
|
||||
}
|
||||
catch(ExecutionException e)
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
Throwable cause = e.getCause();
|
||||
Assert.assertTrue(cause instanceof IOException);
|
||||
Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED"));
|
||||
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
|
||||
}
|
||||
assertEquals("",_endp.takeOutputString());
|
||||
assertEquals("", _endp.takeOutputString());
|
||||
assertTrue(_flusher.isIdle());
|
||||
}
|
||||
|
||||
|
@ -203,7 +202,7 @@ public class WriteFlusherTest
|
|||
public void testFailWhileBlocking() throws Exception
|
||||
{
|
||||
FutureCallback<String> callback = new FutureCallback<>();
|
||||
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
|
||||
_flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
|
||||
|
||||
assertFalse(callback.isDone());
|
||||
assertFalse(callback.isCancelled());
|
||||
|
@ -211,7 +210,7 @@ public class WriteFlusherTest
|
|||
assertTrue(_flushIncomplete.get());
|
||||
try
|
||||
{
|
||||
assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS));
|
||||
assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
|
||||
Assert.fail();
|
||||
}
|
||||
catch (TimeoutException to)
|
||||
|
@ -226,17 +225,17 @@ public class WriteFlusherTest
|
|||
assertFlushIsComplete();
|
||||
try
|
||||
{
|
||||
assertEquals(_context,callback.get());
|
||||
assertEquals(_context, callback.get());
|
||||
Assert.fail();
|
||||
}
|
||||
catch(ExecutionException e)
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
Throwable cause = e.getCause();
|
||||
Assert.assertTrue(cause instanceof IOException);
|
||||
Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure"));
|
||||
Assert.assertThat(cause.getMessage(), Matchers.containsString("Failure"));
|
||||
}
|
||||
assertEquals("", _endp.takeOutputString());
|
||||
|
||||
|
||||
assertTrue(_flusher.isIdle());
|
||||
}
|
||||
|
||||
|
@ -245,29 +244,29 @@ public class WriteFlusherTest
|
|||
final ByteArrayEndPoint _endp;
|
||||
final SecureRandom _random;
|
||||
final ScheduledThreadPoolExecutor _scheduler;
|
||||
final StringBuilder _content=new StringBuilder();
|
||||
|
||||
ConcurrentFlusher(ByteArrayEndPoint endp,SecureRandom random, ScheduledThreadPoolExecutor scheduler)
|
||||
final StringBuilder _content = new StringBuilder();
|
||||
|
||||
ConcurrentFlusher(ByteArrayEndPoint endp, SecureRandom random, ScheduledThreadPoolExecutor scheduler)
|
||||
{
|
||||
super(endp);
|
||||
_endp=endp;
|
||||
_random=random;
|
||||
_scheduler=scheduler;
|
||||
_endp = endp;
|
||||
_random = random;
|
||||
_scheduler = scheduler;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void onIncompleteFlushed()
|
||||
{
|
||||
_scheduler.schedule(this,1+_random.nextInt(9),TimeUnit.MILLISECONDS);
|
||||
_scheduler.schedule(this, 1 + _random.nextInt(9), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized void run()
|
||||
{
|
||||
_content.append(_endp.takeOutputString());
|
||||
completeWrite();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized String toString()
|
||||
{
|
||||
|
@ -275,25 +274,25 @@ public class WriteFlusherTest
|
|||
return _content.toString();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testConcurrent() throws Exception
|
||||
{
|
||||
final SecureRandom random = new SecureRandom();
|
||||
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100);
|
||||
|
||||
|
||||
|
||||
|
||||
ConcurrentFlusher[] flushers = new ConcurrentFlusher[50000];
|
||||
FutureCallback<?>[] futures = new FutureCallback<?>[flushers.length];
|
||||
for (int i=0;i<flushers.length;i++)
|
||||
for (int i = 0; i < flushers.length; i++)
|
||||
{
|
||||
int size=5+random.nextInt(15);
|
||||
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[]{},size);
|
||||
int size = 5 + random.nextInt(15);
|
||||
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[]{}, size);
|
||||
|
||||
final ConcurrentFlusher flusher = new ConcurrentFlusher(endp,random,scheduler);
|
||||
flushers[i]=flusher;
|
||||
final ConcurrentFlusher flusher = new ConcurrentFlusher(endp, random, scheduler);
|
||||
flushers[i] = flusher;
|
||||
final FutureCallback<String> callback = new FutureCallback<>();
|
||||
futures[i]=callback;
|
||||
futures[i] = callback;
|
||||
scheduler.schedule(new Runnable()
|
||||
{
|
||||
@Override
|
||||
|
@ -302,39 +301,37 @@ public class WriteFlusherTest
|
|||
flusher.onFail(new Throwable("THE CAUSE"));
|
||||
}
|
||||
}
|
||||
,random.nextInt(75)+1,TimeUnit.MILLISECONDS);
|
||||
flusher.write(_context,callback,BufferUtil.toBuffer("How Now Brown Cow."),BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!"));
|
||||
, random.nextInt(75) + 1, TimeUnit.MILLISECONDS);
|
||||
flusher.write(_context, callback, BufferUtil.toBuffer("How Now Brown Cow."), BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!"));
|
||||
}
|
||||
|
||||
int completed=0;
|
||||
int failed=0;
|
||||
|
||||
for (int i=0;i<flushers.length;i++)
|
||||
int completed = 0;
|
||||
int failed = 0;
|
||||
|
||||
for (int i = 0; i < flushers.length; i++)
|
||||
{
|
||||
try
|
||||
{
|
||||
futures[i].get();
|
||||
assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!",flushers[i].toString());
|
||||
assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!", flushers[i].toString());
|
||||
completed++;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
assertThat(e.getMessage(),Matchers.containsString("THE CAUSE"));
|
||||
assertThat(e.getMessage(), Matchers.containsString("THE CAUSE"));
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(completed,Matchers.greaterThan(0));
|
||||
assertThat(failed,Matchers.greaterThan(0));
|
||||
|
||||
|
||||
assertThat(completed, Matchers.greaterThan(0));
|
||||
assertThat(failed, Matchers.greaterThan(0));
|
||||
|
||||
scheduler.shutdown();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testConcurrentAccessToWriteAndFailed() throws IOException, InterruptedException, ExecutionException
|
||||
public void testConcurrentAccessToWriteAndOnFail() throws IOException, InterruptedException, ExecutionException
|
||||
{
|
||||
ExecutorService executor = Executors.newFixedThreadPool(16);
|
||||
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
|
||||
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
|
||||
final CountDownLatch writeCompleteLatch = new CountDownLatch(1);
|
||||
|
@ -400,11 +397,10 @@ public class WriteFlusherTest
|
|||
}
|
||||
}
|
||||
|
||||
@Ignore("Intermittent failures.") //TODO: fixme
|
||||
@Test(expected = WritePendingException.class)
|
||||
public void testConcurrentAccessToWrite() throws Throwable
|
||||
{
|
||||
ExecutorService executor = Executors.newFixedThreadPool(16);
|
||||
final CountDownLatch flushCalledLatch = new CountDownLatch(1);
|
||||
|
||||
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
|
||||
{
|
||||
|
@ -420,13 +416,16 @@ public class WriteFlusherTest
|
|||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable
|
||||
{
|
||||
flushCalledLatch.countDown();
|
||||
// make sure we stay here, so write is called twice at the same time
|
||||
Thread.sleep(5000);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
executor.submit(new Writer(writeFlusher, new FutureCallback()));
|
||||
executor.submit(new Writer(writeFlusher, new FutureCallback<String>()));
|
||||
// make sure that we call .get() on the write that executed second by waiting on this latch
|
||||
assertThat("Flush has been called once", flushCalledLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
try
|
||||
{
|
||||
executor.submit(new Writer(writeFlusher, new FutureCallback())).get();
|
||||
|
@ -459,70 +458,87 @@ public class WriteFlusherTest
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException
|
||||
public void testConcurrentAccessToIncompleteWriteAndOnFail() throws IOException, InterruptedException,
|
||||
ExecutionException, TimeoutException
|
||||
{
|
||||
ExecutorService executor = Executors.newFixedThreadPool(16);
|
||||
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
|
||||
final CountDownLatch onIncompleteFlushedCalledLatch = new CountDownLatch(1);
|
||||
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
|
||||
final CountDownLatch completeWrite = new CountDownLatch(1);
|
||||
|
||||
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
|
||||
final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch))
|
||||
{
|
||||
protected void onIncompleteFlushed()
|
||||
{
|
||||
onIncompleteFlushedCalledLatch.countDown();
|
||||
try
|
||||
{
|
||||
failedCalledLatch.await(5, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
completeWrite();
|
||||
completeWrite.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
endPointFlushExpectationPendingWrite(writeCalledLatch, failedCalledLatch);
|
||||
|
||||
ExposingStateCallback callback = new ExposingStateCallback();
|
||||
executor.submit(new Writer(writeFlusher, callback));
|
||||
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
// make sure we're in pending state when calling onFail
|
||||
assertThat("onIncompleteFlushed has been called.", onIncompleteFlushedCalledLatch.await(5,
|
||||
TimeUnit.SECONDS), is(true));
|
||||
executor.submit(new FailedCaller(writeFlusher, failedCalledLatch));
|
||||
assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
writeFlusher.write(_context, new FutureCallback<String>(), BufferUtil.toBuffer("foobar"));
|
||||
assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true));
|
||||
// when we fail in PENDING state, we should have called callback.failed()
|
||||
assertThat("callback failed has been called", callback.isFailed(), is(true));
|
||||
assertThat("callback complete has not been called", callback.isCompleted(), is(false));
|
||||
}
|
||||
|
||||
|
||||
//TODO: combine with endPointFlushExpectation
|
||||
private void endPointFlushExpectationPendingWrite(final CountDownLatch writeCalledLatch, final CountDownLatch
|
||||
failedCalledLatch)
|
||||
throws
|
||||
IOException
|
||||
private static class EndPointMock extends ByteArrayEndPoint
|
||||
{
|
||||
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
|
||||
private final CountDownLatch writeCalledLatch;
|
||||
private final CountDownLatch failedCalledLatch;
|
||||
|
||||
public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
|
||||
{
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable
|
||||
this.writeCalledLatch = writeCalledLatch;
|
||||
this.failedCalledLatch = failedCalledLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int flush(ByteBuffer... buffers) throws IOException
|
||||
{
|
||||
writeCalledLatch.countDown();
|
||||
ByteBuffer byteBuffer = buffers[0];
|
||||
int oldPos = byteBuffer.position();
|
||||
if (byteBuffer.remaining() == 2)
|
||||
{
|
||||
writeCalledLatch.countDown();
|
||||
Object[] arguments = invocation.getArguments();
|
||||
ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
|
||||
int oldPos = byteBuffer.position();
|
||||
if (byteBuffer.remaining() == 2)
|
||||
// make sure failed is called before we go on
|
||||
try
|
||||
{
|
||||
// make sure failed is called before we go on
|
||||
failedCalledLatch.await(5, TimeUnit.SECONDS);
|
||||
BufferUtil.flipToFill(byteBuffer);
|
||||
}
|
||||
else if (byteBuffer.remaining() == 3)
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
byteBuffer.position(1); // pretend writing one byte
|
||||
return 1;
|
||||
e.printStackTrace();
|
||||
}
|
||||
else
|
||||
{
|
||||
byteBuffer.position(byteBuffer.limit());
|
||||
}
|
||||
return byteBuffer.limit() - oldPos;
|
||||
BufferUtil.flipToFill(byteBuffer);
|
||||
}
|
||||
});
|
||||
else if (byteBuffer.remaining() == 3)
|
||||
{
|
||||
byteBuffer.position(1); // pretend writing one byte
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
byteBuffer.position(byteBuffer.limit());
|
||||
}
|
||||
return byteBuffer.limit() - oldPos;
|
||||
}
|
||||
}
|
||||
|
||||
private static class FailedCaller implements Callable
|
||||
|
@ -550,7 +566,7 @@ public class WriteFlusherTest
|
|||
private final WriteFlusher writeFlusher;
|
||||
private FutureCallback<String> callback;
|
||||
|
||||
public Writer(WriteFlusher writeFlusher, FutureCallback callback)
|
||||
public Writer(WriteFlusher writeFlusher, FutureCallback<String> callback)
|
||||
{
|
||||
this.writeFlusher = writeFlusher;
|
||||
this.callback = callback;
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.eclipse.jetty.io.Connection;
|
|||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
||||
public class EmptyAsyncEndPoint implements EndPoint
|
||||
public class EmptyEndPoint implements EndPoint
|
||||
{
|
||||
private boolean checkForIdle;
|
||||
private Connection connection;
|
|
@ -26,7 +26,7 @@ import org.eclipse.jetty.util.BufferUtil;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public class NextProtoNegoClientAsyncConnection extends AbstractConnection implements NextProtoNego.ClientProvider
|
||||
public class NextProtoNegoClientConnection extends AbstractConnection implements NextProtoNego.ClientProvider
|
||||
{
|
||||
private final Logger logger = Log.getLogger(getClass());
|
||||
private final SocketChannel channel;
|
||||
|
@ -34,7 +34,7 @@ public class NextProtoNegoClientAsyncConnection extends AbstractConnection imple
|
|||
private final SPDYClient client;
|
||||
private volatile boolean completed;
|
||||
|
||||
public NextProtoNegoClientAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment, Executor executor, SPDYClient client)
|
||||
public NextProtoNegoClientConnection(SocketChannel channel, EndPoint endPoint, Object attachment, Executor executor, SPDYClient client)
|
||||
{
|
||||
super(endPoint, executor);
|
||||
this.channel = channel;
|
||||
|
@ -48,7 +48,7 @@ public class NextProtoNegoClientAsyncConnection extends AbstractConnection imple
|
|||
super.onOpen();
|
||||
fillInterested();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onFillable()
|
||||
{
|
|
@ -347,7 +347,7 @@ public class SPDYClient
|
|||
};
|
||||
|
||||
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
|
||||
NextProtoNegoClientAsyncConnection connection = new NextProtoNegoClientAsyncConnection(channel, sslEndPoint, attachment, client.factory.threadPool, client);
|
||||
NextProtoNegoClientConnection connection = new NextProtoNegoClientConnection(channel, sslEndPoint, attachment, client.factory.threadPool, client);
|
||||
sslEndPoint.setConnection(connection);
|
||||
connectionOpened(connection);
|
||||
|
||||
|
@ -414,7 +414,7 @@ public class SPDYClient
|
|||
Parser parser = new Parser(compressionFactory.newDecompressor());
|
||||
Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor());
|
||||
|
||||
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
|
||||
SPDYConnection connection = new ClientSPDYConnection(endPoint, factory.bufferPool, parser, factory);
|
||||
endPoint.setConnection(connection);
|
||||
|
||||
FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
|
||||
|
@ -430,11 +430,11 @@ public class SPDYClient
|
|||
return connection;
|
||||
}
|
||||
|
||||
private class ClientSPDYAsyncConnection extends SPDYAsyncConnection
|
||||
private class ClientSPDYConnection extends SPDYConnection
|
||||
{
|
||||
private final Factory factory;
|
||||
|
||||
public ClientSPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
|
||||
public ClientSPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
|
||||
{
|
||||
super(endPoint, bufferPool, parser, factory.threadPool);
|
||||
this.factory = factory;
|
||||
|
|
|
@ -18,23 +18,23 @@ import java.nio.ByteBuffer;
|
|||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.RuntimeIOException;
|
||||
import org.eclipse.jetty.spdy.parser.Parser;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public class SPDYAsyncConnection extends AbstractConnection implements Controller<StandardSession.FrameBytes>, IdleListener
|
||||
public class SPDYConnection extends AbstractConnection implements Controller<StandardSession.FrameBytes>, IdleListener
|
||||
{
|
||||
private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class);
|
||||
private static final Logger logger = Log.getLogger(SPDYConnection.class);
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final Parser parser;
|
||||
private volatile ISession session;
|
||||
private volatile boolean idle = false;
|
||||
|
||||
public SPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
|
||||
public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
|
||||
{
|
||||
super(endPoint, executor);
|
||||
this.bufferPool = bufferPool;
|
||||
|
@ -48,7 +48,7 @@ public class SPDYAsyncConnection extends AbstractConnection implements Controlle
|
|||
super.onOpen();
|
||||
fillInterested();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onFillable()
|
||||
{
|
|
@ -21,8 +21,6 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
|
@ -42,7 +40,7 @@ public class SPDYServerConnector extends SelectChannelConnector
|
|||
{
|
||||
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
|
||||
private final ServerSessionFrameListener listener;
|
||||
private volatile int initialWindowSize = 65536;
|
||||
private volatile int initialWindowSize;
|
||||
|
||||
public SPDYServerConnector(Server server, ServerSessionFrameListener listener)
|
||||
{
|
||||
|
@ -53,6 +51,7 @@ public class SPDYServerConnector extends SelectChannelConnector
|
|||
{
|
||||
super(server, sslContextFactory);
|
||||
this.listener = listener;
|
||||
setInitialWindowSize(65536);
|
||||
putConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), listener));
|
||||
putConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), listener));
|
||||
setDefaultConnectionFactory(getConnectionFactory("spdy/2"));
|
||||
|
@ -173,25 +172,10 @@ public class SPDYServerConnector extends SelectChannelConnector
|
|||
getSelectorManager().connectionUpgraded(endPoint, oldConnection);
|
||||
}
|
||||
|
||||
private class LazyExecutor implements Executor
|
||||
{
|
||||
@Override
|
||||
public void execute(Runnable command)
|
||||
{
|
||||
Executor threadPool = getExecutor();
|
||||
if (threadPool == null)
|
||||
throw new RejectedExecutionException();
|
||||
threadPool.execute(command);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
super.dump(out,indent);
|
||||
AggregateLifeCycle.dump(out, indent, new ArrayList<Session>(sessions));
|
||||
super.dump(out, indent);
|
||||
AggregateLifeCycle.dump(out, indent, new ArrayList<>(sessions));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ public class ServerSPDYAsyncConnectionFactory implements ConnectionFactory
|
|||
SPDYServerConnector connector = (SPDYServerConnector)attachment;
|
||||
|
||||
ServerSessionFrameListener listener = provideServerSessionFrameListener(endPoint, attachment);
|
||||
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, bufferPool, parser, listener, connector);
|
||||
SPDYConnection connection = new ServerSPDYConnection(endPoint, bufferPool, parser, listener, connector);
|
||||
endPoint.setConnection(connection);
|
||||
|
||||
FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version);
|
||||
|
@ -83,13 +83,13 @@ public class ServerSPDYAsyncConnectionFactory implements ConnectionFactory
|
|||
return listener;
|
||||
}
|
||||
|
||||
private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection
|
||||
private static class ServerSPDYConnection extends SPDYConnection
|
||||
{
|
||||
private final ServerSessionFrameListener listener;
|
||||
private final SPDYServerConnector connector;
|
||||
private volatile boolean connected;
|
||||
|
||||
private ServerSPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
|
||||
private ServerSPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
|
||||
{
|
||||
super(endPoint, bufferPool, parser, connector.getExecutor());
|
||||
this.listener = listener;
|
||||
|
|
Loading…
Reference in New Issue