diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 88854563c89..9a574dc2e1e 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -13,19 +13,13 @@ package org.eclipse.jetty.io; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.io.SelectorManager.ManagedSelector; -import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -200,6 +194,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa { keyString += "!"; } - return String.format("%s{io=%d,k=%s}",_interestOps, keyString); + return String.format("%s{io=%d,k=%s}",super.toString(), _interestOps, keyString); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 91f367c07ad..eec6ee6b506 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -62,11 +62,10 @@ abstract public class WriteFlusher // IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE // // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure. + // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions, + // the callback's complete or respectively failed methods will be called. // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state - // Otherwise if a fail happens, the state is set to FAIL, so that a subsequent attempt to move out of WRITING or COMPLETING - // will discover the failure and call the callbacks before returning to IDLE - // Thus the possible paths for a failure are: - // + // // IDLE--(fail)-->IDLE // IDLE-->WRITING--(fail)-->FAILED-->IDLE // IDLE-->WRITING-->PENDING--(fail)-->IDLE @@ -94,7 +93,8 @@ abstract public class WriteFlusher /** * Tries to update the current state to the given new state. - * @param nextState the desired new state + * @param previous the expected current state + * @param next the desired new state * @return the previous state or null if the state transition failed * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error) */ @@ -135,10 +135,6 @@ abstract public class WriteFlusher private boolean isTransitionAllowed(State currentState, State newState) { Set allowedNewStateTypes = __stateTransitions.get(currentState.getType()); - if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING) - { - throw new WritePendingException(); - } if (!allowedNewStateTypes.contains(newState.getType())) { LOG.debug("StateType update: {} -> {} not allowed", currentState, newState); @@ -234,7 +230,7 @@ abstract public class WriteFlusher { private final C _context; private final Callback _callback; - private ByteBuffer[] _buffers; + private final ByteBuffer[] _buffers; private PendingState(ByteBuffer[] buffers, C context, Callback callback) { @@ -288,7 +284,7 @@ abstract public class WriteFlusher if (!updateState(__IDLE,__WRITING)) throw new WritePendingException(); - + try { _endPoint.flush(buffers); @@ -333,12 +329,11 @@ abstract public class WriteFlusher public void completeWrite() { State previous = _state.get(); - PendingState pending=null; - + if (previous.getType()!=StateType.PENDING) return; // failure already handled. - pending=(PendingState)previous; + PendingState pending = (PendingState)previous; if (!updateState(pending,__COMPLETING)) return; // failure already handled. @@ -412,7 +407,7 @@ abstract public class WriteFlusher onFail(new ClosedChannelException()); } - public boolean isIdle() + boolean isIdle() { return _state.get().getType() == StateType.IDLE; } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java index cab02a6da9b..da46c071fcf 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java @@ -1,6 +1,5 @@ package org.eclipse.jetty.io; -import static junit.framework.Assert.assertTrue; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; @@ -21,7 +20,6 @@ import org.eclipse.jetty.util.FutureCallback; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -47,13 +45,14 @@ public class WriteFlusherTest private WriteFlusher _flusher; private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false); - private final String _context = new String("Context"); + private final String _context = "Context"; + private final ExecutorService executor = Executors.newFixedThreadPool(16); private ByteArrayEndPoint _endp; @Before public void before() { - _endp = new ByteArrayEndPoint(new byte[]{},10); + _endp = new ByteArrayEndPoint(new byte[]{}, 10); _flushIncomplete.set(false); _flusher = new WriteFlusher(_endp) { @@ -72,7 +71,7 @@ public class WriteFlusherTest FutureCallback callback = new FutureCallback<>(); _flusher.onFail(new IOException("Ignored because no operation in progress")); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertCallbackIsDone(callback); assertFlushIsComplete(); assertThat("context and callback.get() are equal", _context, equalTo(callback.get())); @@ -80,14 +79,14 @@ public class WriteFlusherTest equalTo(_endp.takeOutputString())); assertTrue(_flusher.isIdle()); } - + @Test public void testCompleteNoBlocking() throws Exception { _endp.setGrowOutput(true); FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertCallbackIsDone(callback); assertFlushIsComplete(); assertThat("context and callback.get() are equal", _context, equalTo(callback.get())); @@ -112,21 +111,21 @@ public class WriteFlusherTest _endp.close(); FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertCallbackIsDone(callback); assertFlushIsComplete(); try { - assertEquals(_context,callback.get()); + assertEquals(_context, callback.get()); Assert.fail(); } - catch(ExecutionException e) + catch (ExecutionException e) { Throwable cause = e.getCause(); Assert.assertTrue(cause instanceof IOException); - Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED")); + Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED")); } - assertEquals("",_endp.takeOutputString()); + assertEquals("", _endp.takeOutputString()); assertTrue(_flusher.isIdle()); } @@ -135,14 +134,14 @@ public class WriteFlusherTest public void testCompleteBlocking() throws Exception { FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); assertTrue(_flushIncomplete.get()); try { - assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); + assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS)); Assert.fail(); } catch (TimeoutException to) @@ -150,11 +149,11 @@ public class WriteFlusherTest _flushIncomplete.set(false); } - assertEquals("How now br",_endp.takeOutputString()); + assertEquals("How now br", _endp.takeOutputString()); _flusher.completeWrite(); assertCallbackIsDone(callback); - assertEquals(_context,callback.get()); - assertEquals("own cow!",_endp.takeOutputString()); + assertEquals(_context, callback.get()); + assertEquals("own cow!", _endp.takeOutputString()); assertFlushIsComplete(); assertTrue(_flusher.isIdle()); } @@ -163,7 +162,7 @@ public class WriteFlusherTest public void testCloseWhileBlocking() throws Exception { FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); @@ -171,7 +170,7 @@ public class WriteFlusherTest assertTrue(_flushIncomplete.get()); try { - assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); + assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS)); Assert.fail(); } catch (TimeoutException to) @@ -179,23 +178,23 @@ public class WriteFlusherTest _flushIncomplete.set(false); } - assertEquals("How now br",_endp.takeOutputString()); + assertEquals("How now br", _endp.takeOutputString()); _endp.close(); _flusher.completeWrite(); assertCallbackIsDone(callback); assertFlushIsComplete(); try { - assertEquals(_context,callback.get()); + assertEquals(_context, callback.get()); Assert.fail(); } - catch(ExecutionException e) + catch (ExecutionException e) { Throwable cause = e.getCause(); Assert.assertTrue(cause instanceof IOException); - Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED")); + Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED")); } - assertEquals("",_endp.takeOutputString()); + assertEquals("", _endp.takeOutputString()); assertTrue(_flusher.isIdle()); } @@ -203,7 +202,7 @@ public class WriteFlusherTest public void testFailWhileBlocking() throws Exception { FutureCallback callback = new FutureCallback<>(); - _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!")); + _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!")); assertFalse(callback.isDone()); assertFalse(callback.isCancelled()); @@ -211,7 +210,7 @@ public class WriteFlusherTest assertTrue(_flushIncomplete.get()); try { - assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS)); + assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS)); Assert.fail(); } catch (TimeoutException to) @@ -226,17 +225,17 @@ public class WriteFlusherTest assertFlushIsComplete(); try { - assertEquals(_context,callback.get()); + assertEquals(_context, callback.get()); Assert.fail(); } - catch(ExecutionException e) + catch (ExecutionException e) { Throwable cause = e.getCause(); Assert.assertTrue(cause instanceof IOException); - Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure")); + Assert.assertThat(cause.getMessage(), Matchers.containsString("Failure")); } assertEquals("", _endp.takeOutputString()); - + assertTrue(_flusher.isIdle()); } @@ -245,29 +244,29 @@ public class WriteFlusherTest final ByteArrayEndPoint _endp; final SecureRandom _random; final ScheduledThreadPoolExecutor _scheduler; - final StringBuilder _content=new StringBuilder(); - - ConcurrentFlusher(ByteArrayEndPoint endp,SecureRandom random, ScheduledThreadPoolExecutor scheduler) + final StringBuilder _content = new StringBuilder(); + + ConcurrentFlusher(ByteArrayEndPoint endp, SecureRandom random, ScheduledThreadPoolExecutor scheduler) { super(endp); - _endp=endp; - _random=random; - _scheduler=scheduler; + _endp = endp; + _random = random; + _scheduler = scheduler; } - + @Override protected void onIncompleteFlushed() { - _scheduler.schedule(this,1+_random.nextInt(9),TimeUnit.MILLISECONDS); + _scheduler.schedule(this, 1 + _random.nextInt(9), TimeUnit.MILLISECONDS); } - + @Override public synchronized void run() { _content.append(_endp.takeOutputString()); completeWrite(); } - + @Override public synchronized String toString() { @@ -275,25 +274,25 @@ public class WriteFlusherTest return _content.toString(); } } - + @Test public void testConcurrent() throws Exception { final SecureRandom random = new SecureRandom(); final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100); - - + + ConcurrentFlusher[] flushers = new ConcurrentFlusher[50000]; FutureCallback[] futures = new FutureCallback[flushers.length]; - for (int i=0;i callback = new FutureCallback<>(); - futures[i]=callback; + futures[i] = callback; scheduler.schedule(new Runnable() { @Override @@ -302,39 +301,37 @@ public class WriteFlusherTest flusher.onFail(new Throwable("THE CAUSE")); } } - ,random.nextInt(75)+1,TimeUnit.MILLISECONDS); - flusher.write(_context,callback,BufferUtil.toBuffer("How Now Brown Cow."),BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!")); + , random.nextInt(75) + 1, TimeUnit.MILLISECONDS); + flusher.write(_context, callback, BufferUtil.toBuffer("How Now Brown Cow."), BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!")); } - int completed=0; - int failed=0; - - for (int i=0;i())); + // make sure that we call .get() on the write that executed second by waiting on this latch + assertThat("Flush has been called once", flushCalledLatch.await(5, TimeUnit.SECONDS), is(true)); try { executor.submit(new Writer(writeFlusher, new FutureCallback())).get(); @@ -459,70 +458,87 @@ public class WriteFlusherTest } @Test - @Ignore - public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException + public void testConcurrentAccessToIncompleteWriteAndOnFail() throws IOException, InterruptedException, + ExecutionException, TimeoutException { - ExecutorService executor = Executors.newFixedThreadPool(16); final CountDownLatch failedCalledLatch = new CountDownLatch(1); final CountDownLatch onIncompleteFlushedCalledLatch = new CountDownLatch(1); final CountDownLatch writeCalledLatch = new CountDownLatch(1); final CountDownLatch completeWrite = new CountDownLatch(1); - final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock) + final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch)) { protected void onIncompleteFlushed() { onIncompleteFlushedCalledLatch.countDown(); + try + { + failedCalledLatch.await(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } completeWrite(); completeWrite.countDown(); } }; - endPointFlushExpectationPendingWrite(writeCalledLatch, failedCalledLatch); - ExposingStateCallback callback = new ExposingStateCallback(); executor.submit(new Writer(writeFlusher, callback)); assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true)); + // make sure we're in pending state when calling onFail + assertThat("onIncompleteFlushed has been called.", onIncompleteFlushedCalledLatch.await(5, + TimeUnit.SECONDS), is(true)); executor.submit(new FailedCaller(writeFlusher, failedCalledLatch)); assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true)); - writeFlusher.write(_context, new FutureCallback(), BufferUtil.toBuffer("foobar")); assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true)); + // when we fail in PENDING state, we should have called callback.failed() + assertThat("callback failed has been called", callback.isFailed(), is(true)); + assertThat("callback complete has not been called", callback.isCompleted(), is(false)); } - - //TODO: combine with endPointFlushExpectation - private void endPointFlushExpectationPendingWrite(final CountDownLatch writeCalledLatch, final CountDownLatch - failedCalledLatch) - throws - IOException + private static class EndPointMock extends ByteArrayEndPoint { - when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer() + private final CountDownLatch writeCalledLatch; + private final CountDownLatch failedCalledLatch; + + public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch) { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable + this.writeCalledLatch = writeCalledLatch; + this.failedCalledLatch = failedCalledLatch; + } + + @Override + public int flush(ByteBuffer... buffers) throws IOException + { + writeCalledLatch.countDown(); + ByteBuffer byteBuffer = buffers[0]; + int oldPos = byteBuffer.position(); + if (byteBuffer.remaining() == 2) { - writeCalledLatch.countDown(); - Object[] arguments = invocation.getArguments(); - ByteBuffer byteBuffer = (ByteBuffer)arguments[0]; - int oldPos = byteBuffer.position(); - if (byteBuffer.remaining() == 2) + // make sure failed is called before we go on + try { - // make sure failed is called before we go on failedCalledLatch.await(5, TimeUnit.SECONDS); - BufferUtil.flipToFill(byteBuffer); } - else if (byteBuffer.remaining() == 3) + catch (InterruptedException e) { - byteBuffer.position(1); // pretend writing one byte - return 1; + e.printStackTrace(); } - else - { - byteBuffer.position(byteBuffer.limit()); - } - return byteBuffer.limit() - oldPos; + BufferUtil.flipToFill(byteBuffer); } - }); + else if (byteBuffer.remaining() == 3) + { + byteBuffer.position(1); // pretend writing one byte + return 1; + } + else + { + byteBuffer.position(byteBuffer.limit()); + } + return byteBuffer.limit() - oldPos; + } } private static class FailedCaller implements Callable @@ -550,7 +566,7 @@ public class WriteFlusherTest private final WriteFlusher writeFlusher; private FutureCallback callback; - public Writer(WriteFlusher writeFlusher, FutureCallback callback) + public Writer(WriteFlusher writeFlusher, FutureCallback callback) { this.writeFlusher = writeFlusher; this.callback = callback; diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyAsyncEndPoint.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyEndPoint.java similarity index 98% rename from jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyAsyncEndPoint.java rename to jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyEndPoint.java index 8dc81ca8c67..2b7189c1ad9 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyAsyncEndPoint.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/EmptyEndPoint.java @@ -23,7 +23,7 @@ import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; -public class EmptyAsyncEndPoint implements EndPoint +public class EmptyEndPoint implements EndPoint { private boolean checkForIdle; private Connection connection; diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientAsyncConnection.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientConnection.java similarity index 92% rename from jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientAsyncConnection.java rename to jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientConnection.java index cb9e6019391..e1cab1e7825 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientAsyncConnection.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/NextProtoNegoClientConnection.java @@ -26,7 +26,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class NextProtoNegoClientAsyncConnection extends AbstractConnection implements NextProtoNego.ClientProvider +public class NextProtoNegoClientConnection extends AbstractConnection implements NextProtoNego.ClientProvider { private final Logger logger = Log.getLogger(getClass()); private final SocketChannel channel; @@ -34,7 +34,7 @@ public class NextProtoNegoClientAsyncConnection extends AbstractConnection imple private final SPDYClient client; private volatile boolean completed; - public NextProtoNegoClientAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment, Executor executor, SPDYClient client) + public NextProtoNegoClientConnection(SocketChannel channel, EndPoint endPoint, Object attachment, Executor executor, SPDYClient client) { super(endPoint, executor); this.channel = channel; @@ -48,7 +48,7 @@ public class NextProtoNegoClientAsyncConnection extends AbstractConnection imple super.onOpen(); fillInterested(); } - + @Override public void onFillable() { diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java index 81865d5b2f7..0f1eba588ae 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java @@ -347,7 +347,7 @@ public class SPDYClient }; EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); - NextProtoNegoClientAsyncConnection connection = new NextProtoNegoClientAsyncConnection(channel, sslEndPoint, attachment, client.factory.threadPool, client); + NextProtoNegoClientConnection connection = new NextProtoNegoClientConnection(channel, sslEndPoint, attachment, client.factory.threadPool, client); sslEndPoint.setConnection(connection); connectionOpened(connection); @@ -414,7 +414,7 @@ public class SPDYClient Parser parser = new Parser(compressionFactory.newDecompressor()); Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor()); - SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory); + SPDYConnection connection = new ClientSPDYConnection(endPoint, factory.bufferPool, parser, factory); endPoint.setConnection(connection); FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy(); @@ -430,11 +430,11 @@ public class SPDYClient return connection; } - private class ClientSPDYAsyncConnection extends SPDYAsyncConnection + private class ClientSPDYConnection extends SPDYConnection { private final Factory factory; - public ClientSPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory) + public ClientSPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory) { super(endPoint, bufferPool, parser, factory.threadPool); this.factory = factory; diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYConnection.java similarity index 92% rename from jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java rename to jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYConnection.java index 810ad41020b..c92b72ee5f0 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYConnection.java @@ -18,23 +18,23 @@ import java.nio.ByteBuffer; import java.util.concurrent.Executor; import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.spdy.parser.Parser; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class SPDYAsyncConnection extends AbstractConnection implements Controller, IdleListener +public class SPDYConnection extends AbstractConnection implements Controller, IdleListener { - private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class); + private static final Logger logger = Log.getLogger(SPDYConnection.class); private final ByteBufferPool bufferPool; private final Parser parser; private volatile ISession session; private volatile boolean idle = false; - public SPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor) + public SPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor) { super(endPoint, executor); this.bufferPool = bufferPool; @@ -48,7 +48,7 @@ public class SPDYAsyncConnection extends AbstractConnection implements Controlle super.onOpen(); fillInterested(); } - + @Override public void onFillable() { diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java index 006330f1766..115808e9ea5 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java @@ -21,8 +21,6 @@ import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; import javax.net.ssl.SSLEngine; import org.eclipse.jetty.io.Connection; @@ -42,7 +40,7 @@ public class SPDYServerConnector extends SelectChannelConnector { private final Queue sessions = new ConcurrentLinkedQueue<>(); private final ServerSessionFrameListener listener; - private volatile int initialWindowSize = 65536; + private volatile int initialWindowSize; public SPDYServerConnector(Server server, ServerSessionFrameListener listener) { @@ -53,6 +51,7 @@ public class SPDYServerConnector extends SelectChannelConnector { super(server, sslContextFactory); this.listener = listener; + setInitialWindowSize(65536); putConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), listener)); putConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), listener)); setDefaultConnectionFactory(getConnectionFactory("spdy/2")); @@ -173,25 +172,10 @@ public class SPDYServerConnector extends SelectChannelConnector getSelectorManager().connectionUpgraded(endPoint, oldConnection); } - private class LazyExecutor implements Executor - { - @Override - public void execute(Runnable command) - { - Executor threadPool = getExecutor(); - if (threadPool == null) - throw new RejectedExecutionException(); - threadPool.execute(command); - } - } - - @Override public void dump(Appendable out, String indent) throws IOException { - super.dump(out,indent); - AggregateLifeCycle.dump(out, indent, new ArrayList(sessions)); + super.dump(out, indent); + AggregateLifeCycle.dump(out, indent, new ArrayList<>(sessions)); } - - } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java index 6f857199776..12624d8bd01 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java @@ -62,7 +62,7 @@ public class ServerSPDYAsyncConnectionFactory implements ConnectionFactory SPDYServerConnector connector = (SPDYServerConnector)attachment; ServerSessionFrameListener listener = provideServerSessionFrameListener(endPoint, attachment); - SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, bufferPool, parser, listener, connector); + SPDYConnection connection = new ServerSPDYConnection(endPoint, bufferPool, parser, listener, connector); endPoint.setConnection(connection); FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version); @@ -83,13 +83,13 @@ public class ServerSPDYAsyncConnectionFactory implements ConnectionFactory return listener; } - private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection + private static class ServerSPDYConnection extends SPDYConnection { private final ServerSessionFrameListener listener; private final SPDYServerConnector connector; private volatile boolean connected; - private ServerSPDYAsyncConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector) + private ServerSPDYConnection(EndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector) { super(endPoint, bufferPool, parser, connector.getExecutor()); this.listener = listener;