diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 83b2dbcf3a7..169123c58c4 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -39,16 +39,16 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint private final FillInterest _fillInterest = new FillInterest() { @Override - protected boolean needsFill() throws IOException + protected void needsFillInterest() throws IOException { - return AbstractEndPoint.this.needsFill(); + AbstractEndPoint.this.needsFillInterest(); } }; private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @Override - protected void onIncompleteFlushed() + protected void onIncompleteFlush() { AbstractEndPoint.this.onIncompleteFlush(); } @@ -130,7 +130,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint protected abstract void onIncompleteFlush(); - protected abstract boolean needsFill() throws IOException; + protected abstract void needsFillInterest() throws IOException; protected FillInterest getFillInterest() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index 4b8c527c2f6..ddd9a5dffa9 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; @@ -102,10 +103,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint setIdleTimeout(idleTimeoutMs); } - - - - /* ------------------------------------------------------------ */ @Override protected void onIncompleteFlush() @@ -115,11 +112,12 @@ public class ByteArrayEndPoint extends AbstractEndPoint /* ------------------------------------------------------------ */ @Override - protected boolean needsFill() throws IOException + protected void needsFillInterest() throws IOException { if (_closed) throw new ClosedChannelException(); - return _in == null || BufferUtil.hasContent(_in); + if (BufferUtil.hasContent(_in) || _in==null) + getScheduler().schedule(new Runnable(){public void run(){getFillInterest().fillable();}},1,TimeUnit.MILLISECONDS); } /* ------------------------------------------------------------ */ diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index b7344687908..e8987e73dd1 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -227,7 +227,7 @@ public class ChannelEndPoint extends AbstractEndPoint } @Override - protected boolean needsFill() throws IOException + protected void needsFillInterest() throws IOException { throw new UnsupportedOperationException(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java index dfc7563c305..846b8cba971 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java @@ -60,10 +60,9 @@ public abstract class FillInterest } try { - if (needsFill()) - fillable(); + needsFillInterest(); } - catch (IOException e) + catch (Throwable e) { onFail(e); } @@ -119,11 +118,9 @@ public abstract class FillInterest /** * Register the read interest * Abstract method to be implemented by the Specific ReadInterest to - * enquire if a read is immediately possible and if not to schedule a future - * call to {@link #fillable()} or {@link #onFail(Throwable)} + * schedule a future call to {@link #fillable()} or {@link #onFail(Throwable)} * - * @return true if a read is possible * @throws IOException */ - abstract protected boolean needsFill() throws IOException; + abstract protected void needsFillInterest() throws IOException; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/IdleTimeout.java b/jetty-io/src/main/java/org/eclipse/jetty/io/IdleTimeout.java index 64b996f021e..4f02125f4b7 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/IdleTimeout.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/IdleTimeout.java @@ -61,6 +61,11 @@ public abstract class IdleTimeout _scheduler = scheduler; } + public Scheduler getScheduler() + { + return _scheduler; + } + public long getIdleTimestamp() { return _idleTimestamp; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 3af3082467c..29668d7782d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -66,10 +66,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa } @Override - protected boolean needsFill() + protected void needsFillInterest() { changeInterests(SelectionKey.OP_READ); - return false; } @Override diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index f29fbff63e2..2f4a90c0a23 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -37,7 +37,7 @@ import org.eclipse.jetty.util.log.Logger; /** * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling * {@link EndPoint#flush(ByteBuffer...)} until all content is written. - * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been written after a call to + * The abstract method {@link #onIncompleteFlush()} is called when not all content has been written after a call to * flush and should organise for the {@link #completeWrite()} method to be called when a subsequent call to flush * should be able to make more progress. *

@@ -275,14 +275,14 @@ abstract public class WriteFlusher * Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()} * or {@link #onFail(Throwable)} when appropriate. */ - abstract protected void onIncompleteFlushed(); + abstract protected void onIncompleteFlush(); /** * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition * fails it'll fail the callback. * * If not all buffers can be written in one go it creates a new PendingState object to preserve the state - * and then calls {@link #onIncompleteFlushed()}. The remaining buffers will be written in {@link #completeWrite()}. + * and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}. * * If all buffers have been written it calls callback.complete(). * @@ -308,7 +308,7 @@ abstract public class WriteFlusher LOG.debug("flushed incomplete"); PendingState pending=new PendingState(buffers, callback); if (updateState(__WRITING,pending)) - onIncompleteFlushed(); + onIncompleteFlush(); else fail(pending); return; @@ -336,7 +336,7 @@ abstract public class WriteFlusher /** - * Complete a write that has not completed and that called {@link #onIncompleteFlushed()} to request a call to this + * Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress. * * It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback @@ -371,7 +371,7 @@ abstract public class WriteFlusher if (buffers!=pending.getBuffers()) pending=new PendingState(buffers, pending._callback); if (updateState(__COMPLETING,pending)) - onIncompleteFlushed(); + onIncompleteFlush(); else fail(pending); return; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 37c0d641684..fe206d0cedc 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -88,6 +88,7 @@ public class SslConnection extends AbstractConnection private ByteBuffer _encryptedOutput; private final boolean _encryptedDirectBuffers = false; private final boolean _decryptedDirectBuffers = false; + private boolean _renegotiationAllowed; private final Runnable _runCompletWrite = new Runnable() { @Override @@ -96,7 +97,14 @@ public class SslConnection extends AbstractConnection _decryptedEndPoint.getWriteFlusher().completeWrite(); } }; - private boolean _renegotiationAllowed; + private final Runnable _runFillable = new Runnable() + { + @Override + public void run() + { + _decryptedEndPoint.getFillInterest().fillable(); + } + }; public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine) { @@ -401,7 +409,7 @@ public class SslConnection extends AbstractConnection } @Override - protected boolean needsFill() throws IOException + protected void needsFillInterest() throws IOException { // This means that the decrypted data consumer has called the fillInterested // method on the DecryptedEndPoint, so we have to work out if there is @@ -411,11 +419,12 @@ public class SslConnection extends AbstractConnection synchronized (DecryptedEndPoint.this) { // Do we already have some app data, then app can fill now so return true - if (BufferUtil.hasContent(_decryptedInput)) - return true; + boolean fillable = (BufferUtil.hasContent(_decryptedInput)) + // or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill + || BufferUtil.hasContent(_encryptedInput) && !_underFlown; // If we have no encrypted data to decrypt OR we have some, but it is not enough - if (BufferUtil.isEmpty(_encryptedInput) || _underFlown) + if (!fillable) { // We are not ready to read data @@ -436,23 +445,17 @@ public class SslConnection extends AbstractConnection // we have already written the net data // pretend we are readable so the wrap is done by next readable callback _fillRequiresFlushToProgress = false; - return true; + fillable=true; } } - else - { - // Normal readable callback - // Get called back on onfillable when then is more data to fill - SslConnection.this.fillInterested(); - } + } + - return false; - } + if (fillable) + getExecutor().execute(_runFillable); else - { - // We are ready to read data - return true; - } + SslConnection.this.fillInterested(); + } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java index f6cefa12e9b..aba6187bbbe 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.io; import static org.hamcrest.Matchers.*; -import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -29,6 +28,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.toolchain.test.AdvancedRunner; @@ -105,7 +105,6 @@ public class ByteArrayEndPointTest BufferUtil.clear(buffer); assertEquals(4,endp.fill(buffer)); assertEquals("more",BufferUtil.toString(buffer)); - } @Test @@ -161,6 +160,7 @@ public class ByteArrayEndPointTest FutureCallback fcb = new FutureCallback(); endp.fillInterested(fcb); + fcb.get(100,TimeUnit.MILLISECONDS); assertTrue(fcb.isDone()); assertEquals(null, fcb.get()); assertEquals(10, endp.fill(buffer)); @@ -168,10 +168,12 @@ public class ByteArrayEndPointTest fcb = new FutureCallback(); endp.fillInterested(fcb); + Thread.sleep(100); assertFalse(fcb.isDone()); assertEquals(0, endp.fill(buffer)); endp.setInput(" more"); + fcb.get(1000,TimeUnit.MILLISECONDS); assertTrue(fcb.isDone()); assertEquals(null, fcb.get()); assertEquals(5, endp.fill(buffer)); @@ -179,6 +181,7 @@ public class ByteArrayEndPointTest fcb = new FutureCallback(); endp.fillInterested(fcb); + Thread.sleep(100); assertFalse(fcb.isDone()); assertEquals(0, endp.fill(buffer)); @@ -189,6 +192,7 @@ public class ByteArrayEndPointTest fcb = new FutureCallback(); endp.fillInterested(fcb); + fcb.get(1000,TimeUnit.MILLISECONDS); assertTrue(fcb.isDone()); assertEquals(null, fcb.get()); assertEquals(-1, endp.fill(buffer)); @@ -197,10 +201,9 @@ public class ByteArrayEndPointTest fcb = new FutureCallback(); endp.fillInterested(fcb); - assertTrue(fcb.isDone()); try { - fcb.get(); + fcb.get(1000,TimeUnit.MILLISECONDS); fail(); } catch (ExecutionException e) @@ -278,6 +281,7 @@ public class ByteArrayEndPointTest FutureCallback fcb = new FutureCallback(); endp.fillInterested(fcb); + fcb.get(100,TimeUnit.MILLISECONDS); assertTrue(fcb.isDone()); assertEquals(null, fcb.get()); assertEquals(4, endp.fill(buffer)); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java index 572ff79a6e3..7039dc67e3d 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java @@ -77,7 +77,7 @@ public class WriteFlusherTest _flusher = new WriteFlusher(_endp) { @Override - protected void onIncompleteFlushed() + protected void onIncompleteFlush() { _flushIncomplete.set(true); } @@ -275,7 +275,7 @@ public class WriteFlusherTest } @Override - protected void onIncompleteFlushed() + protected void onIncompleteFlush() { _scheduler.schedule(this, 1 + _random.nextInt(9), TimeUnit.MILLISECONDS); } @@ -368,7 +368,7 @@ public class WriteFlusherTest } @Override - protected void onIncompleteFlushed() + protected void onIncompleteFlush() { } }; @@ -469,7 +469,7 @@ public class WriteFlusherTest final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock) { @Override - protected void onIncompleteFlushed() + protected void onIncompleteFlush() { } }; @@ -529,7 +529,7 @@ public class WriteFlusherTest final WriteFlusher writeFlusher = new WriteFlusher(new EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(writeCalledLatch, failedCalledLatch)) { @Override - protected void onIncompleteFlushed() + protected void onIncompleteFlush() { onIncompleteFlushedCalledLatch.countDown(); try @@ -622,7 +622,7 @@ public class WriteFlusherTest final WriteFlusher writeFlusher = new WriteFlusher(endp) { @Override - protected void onIncompleteFlushed() + protected void onIncompleteFlush() { executor.submit(new Runnable() { diff --git a/jetty-io/src/test/resources/jetty-logging.properties b/jetty-io/src/test/resources/jetty-logging.properties index f17a079198f..257743ed54b 100644 --- a/jetty-io/src/test/resources/jetty-logging.properties +++ b/jetty-io/src/test/resources/jetty-logging.properties @@ -1,5 +1,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.LEVEL=INFO -org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG -org.eclipse.jetty.io.ManagedSelector.LEVEL=DEBUG -org.eclipse.jetty.io.ssl.SslConnection.LEVEL=DEBUG +#org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG +#org.eclipse.jetty.io.ManagedSelector.LEVEL=DEBUG +#org.eclipse.jetty.io.ssl.SslConnection.LEVEL=DEBUG diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 26ff80fb5bf..4d4fcfc4247 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -187,7 +187,7 @@ public class LocalConnector extends AbstractConnector public LocalEndPoint() { - super(getScheduler(), LocalConnector.this.getIdleTimeout()); + super(LocalConnector.this.getScheduler(), LocalConnector.this.getIdleTimeout()); setGrowOutput(true); }