fillInterest never recurses

This commit is contained in:
Greg Wilkins 2014-12-19 23:03:01 +01:00
parent 448f150ac4
commit b0b038f5b0
12 changed files with 64 additions and 58 deletions

View File

@ -39,16 +39,16 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
private final FillInterest _fillInterest = new FillInterest()
{
@Override
protected boolean needsFill() throws IOException
protected void needsFillInterest() throws IOException
{
return AbstractEndPoint.this.needsFill();
AbstractEndPoint.this.needsFillInterest();
}
};
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
protected void onIncompleteFlushed()
protected void onIncompleteFlush()
{
AbstractEndPoint.this.onIncompleteFlush();
}
@ -130,7 +130,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
protected abstract void onIncompleteFlush();
protected abstract boolean needsFill() throws IOException;
protected abstract void needsFillInterest() throws IOException;
protected FillInterest getFillInterest()
{

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
@ -102,10 +103,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint
setIdleTimeout(idleTimeoutMs);
}
/* ------------------------------------------------------------ */
@Override
protected void onIncompleteFlush()
@ -115,11 +112,12 @@ public class ByteArrayEndPoint extends AbstractEndPoint
/* ------------------------------------------------------------ */
@Override
protected boolean needsFill() throws IOException
protected void needsFillInterest() throws IOException
{
if (_closed)
throw new ClosedChannelException();
return _in == null || BufferUtil.hasContent(_in);
if (BufferUtil.hasContent(_in) || _in==null)
getScheduler().schedule(new Runnable(){public void run(){getFillInterest().fillable();}},1,TimeUnit.MILLISECONDS);
}
/* ------------------------------------------------------------ */

View File

@ -227,7 +227,7 @@ public class ChannelEndPoint extends AbstractEndPoint
}
@Override
protected boolean needsFill() throws IOException
protected void needsFillInterest() throws IOException
{
throw new UnsupportedOperationException();
}

View File

@ -60,10 +60,9 @@ public abstract class FillInterest
}
try
{
if (needsFill())
fillable();
needsFillInterest();
}
catch (IOException e)
catch (Throwable e)
{
onFail(e);
}
@ -119,11 +118,9 @@ public abstract class FillInterest
/**
* Register the read interest
* Abstract method to be implemented by the Specific ReadInterest to
* enquire if a read is immediately possible and if not to schedule a future
* call to {@link #fillable()} or {@link #onFail(Throwable)}
* schedule a future call to {@link #fillable()} or {@link #onFail(Throwable)}
*
* @return true if a read is possible
* @throws IOException
*/
abstract protected boolean needsFill() throws IOException;
abstract protected void needsFillInterest() throws IOException;
}

View File

@ -61,6 +61,11 @@ public abstract class IdleTimeout
_scheduler = scheduler;
}
public Scheduler getScheduler()
{
return _scheduler;
}
public long getIdleTimestamp()
{
return _idleTimestamp;

View File

@ -66,10 +66,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
}
@Override
protected boolean needsFill()
protected void needsFillInterest()
{
changeInterests(SelectionKey.OP_READ);
return false;
}
@Override

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.util.log.Logger;
/**
* A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
* {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #onIncompleteFlushed()} is called when not all content has been written after a call to
* The abstract method {@link #onIncompleteFlush()} is called when not all content has been written after a call to
* flush and should organise for the {@link #completeWrite()} method to be called when a subsequent call to flush
* should be able to make more progress.
* <p>
@ -275,14 +275,14 @@ abstract public class WriteFlusher
* Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
* or {@link #onFail(Throwable)} when appropriate.
*/
abstract protected void onIncompleteFlushed();
abstract protected void onIncompleteFlush();
/**
* Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
* fails it'll fail the callback.
*
* If not all buffers can be written in one go it creates a new <code>PendingState</code> object to preserve the state
* and then calls {@link #onIncompleteFlushed()}. The remaining buffers will be written in {@link #completeWrite()}.
* and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}.
*
* If all buffers have been written it calls callback.complete().
*
@ -308,7 +308,7 @@ abstract public class WriteFlusher
LOG.debug("flushed incomplete");
PendingState pending=new PendingState(buffers, callback);
if (updateState(__WRITING,pending))
onIncompleteFlushed();
onIncompleteFlush();
else
fail(pending);
return;
@ -336,7 +336,7 @@ abstract public class WriteFlusher
/**
* Complete a write that has not completed and that called {@link #onIncompleteFlushed()} to request a call to this
* Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
*
* It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback
@ -371,7 +371,7 @@ abstract public class WriteFlusher
if (buffers!=pending.getBuffers())
pending=new PendingState(buffers, pending._callback);
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
onIncompleteFlush();
else
fail(pending);
return;

View File

@ -88,6 +88,7 @@ public class SslConnection extends AbstractConnection
private ByteBuffer _encryptedOutput;
private final boolean _encryptedDirectBuffers = false;
private final boolean _decryptedDirectBuffers = false;
private boolean _renegotiationAllowed;
private final Runnable _runCompletWrite = new Runnable()
{
@Override
@ -96,7 +97,14 @@ public class SslConnection extends AbstractConnection
_decryptedEndPoint.getWriteFlusher().completeWrite();
}
};
private boolean _renegotiationAllowed;
private final Runnable _runFillable = new Runnable()
{
@Override
public void run()
{
_decryptedEndPoint.getFillInterest().fillable();
}
};
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine)
{
@ -401,7 +409,7 @@ public class SslConnection extends AbstractConnection
}
@Override
protected boolean needsFill() throws IOException
protected void needsFillInterest() throws IOException
{
// This means that the decrypted data consumer has called the fillInterested
// method on the DecryptedEndPoint, so we have to work out if there is
@ -411,11 +419,12 @@ public class SslConnection extends AbstractConnection
synchronized (DecryptedEndPoint.this)
{
// Do we already have some app data, then app can fill now so return true
if (BufferUtil.hasContent(_decryptedInput))
return true;
boolean fillable = (BufferUtil.hasContent(_decryptedInput))
// or if we have encryptedInput and have not underflowed yet, the it is worth trying a fill
|| BufferUtil.hasContent(_encryptedInput) && !_underFlown;
// If we have no encrypted data to decrypt OR we have some, but it is not enough
if (BufferUtil.isEmpty(_encryptedInput) || _underFlown)
if (!fillable)
{
// We are not ready to read data
@ -436,23 +445,17 @@ public class SslConnection extends AbstractConnection
// we have already written the net data
// pretend we are readable so the wrap is done by next readable callback
_fillRequiresFlushToProgress = false;
return true;
fillable=true;
}
}
else
{
// Normal readable callback
// Get called back on onfillable when then is more data to fill
SslConnection.this.fillInterested();
}
}
return false;
}
if (fillable)
getExecutor().execute(_runFillable);
else
{
// We are ready to read data
return true;
}
SslConnection.this.fillInterested();
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@ -29,6 +28,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
@ -105,7 +105,6 @@ public class ByteArrayEndPointTest
BufferUtil.clear(buffer);
assertEquals(4,endp.fill(buffer));
assertEquals("more",BufferUtil.toString(buffer));
}
@Test
@ -161,6 +160,7 @@ public class ByteArrayEndPointTest
FutureCallback fcb = new FutureCallback();
endp.fillInterested(fcb);
fcb.get(100,TimeUnit.MILLISECONDS);
assertTrue(fcb.isDone());
assertEquals(null, fcb.get());
assertEquals(10, endp.fill(buffer));
@ -168,10 +168,12 @@ public class ByteArrayEndPointTest
fcb = new FutureCallback();
endp.fillInterested(fcb);
Thread.sleep(100);
assertFalse(fcb.isDone());
assertEquals(0, endp.fill(buffer));
endp.setInput(" more");
fcb.get(1000,TimeUnit.MILLISECONDS);
assertTrue(fcb.isDone());
assertEquals(null, fcb.get());
assertEquals(5, endp.fill(buffer));
@ -179,6 +181,7 @@ public class ByteArrayEndPointTest
fcb = new FutureCallback();
endp.fillInterested(fcb);
Thread.sleep(100);
assertFalse(fcb.isDone());
assertEquals(0, endp.fill(buffer));
@ -189,6 +192,7 @@ public class ByteArrayEndPointTest
fcb = new FutureCallback();
endp.fillInterested(fcb);
fcb.get(1000,TimeUnit.MILLISECONDS);
assertTrue(fcb.isDone());
assertEquals(null, fcb.get());
assertEquals(-1, endp.fill(buffer));
@ -197,10 +201,9 @@ public class ByteArrayEndPointTest
fcb = new FutureCallback();
endp.fillInterested(fcb);
assertTrue(fcb.isDone());
try
{
fcb.get();
fcb.get(1000,TimeUnit.MILLISECONDS);
fail();
}
catch (ExecutionException e)
@ -278,6 +281,7 @@ public class ByteArrayEndPointTest
FutureCallback fcb = new FutureCallback();
endp.fillInterested(fcb);
fcb.get(100,TimeUnit.MILLISECONDS);
assertTrue(fcb.isDone());
assertEquals(null, fcb.get());
assertEquals(4, endp.fill(buffer));

View File

@ -77,7 +77,7 @@ public class WriteFlusherTest
_flusher = new WriteFlusher(_endp)
{
@Override
protected void onIncompleteFlushed()
protected void onIncompleteFlush()
{
_flushIncomplete.set(true);
}
@ -275,7 +275,7 @@ public class WriteFlusherTest
}
@Override
protected void onIncompleteFlushed()
protected void onIncompleteFlush()
{
_scheduler.schedule(this, 1 + _random.nextInt(9), TimeUnit.MILLISECONDS);
}
@ -368,7 +368,7 @@ public class WriteFlusherTest
}
@Override
protected void onIncompleteFlushed()
protected void onIncompleteFlush()
{
}
};
@ -469,7 +469,7 @@ public class WriteFlusherTest
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
@Override
protected void onIncompleteFlushed()
protected void onIncompleteFlush()
{
}
};
@ -529,7 +529,7 @@ public class WriteFlusherTest
final WriteFlusher writeFlusher = new WriteFlusher(new EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(writeCalledLatch, failedCalledLatch))
{
@Override
protected void onIncompleteFlushed()
protected void onIncompleteFlush()
{
onIncompleteFlushedCalledLatch.countDown();
try
@ -622,7 +622,7 @@ public class WriteFlusherTest
final WriteFlusher writeFlusher = new WriteFlusher(endp)
{
@Override
protected void onIncompleteFlushed()
protected void onIncompleteFlush()
{
executor.submit(new Runnable()
{

View File

@ -1,5 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=INFO
org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
org.eclipse.jetty.io.ManagedSelector.LEVEL=DEBUG
org.eclipse.jetty.io.ssl.SslConnection.LEVEL=DEBUG
#org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
#org.eclipse.jetty.io.ManagedSelector.LEVEL=DEBUG
#org.eclipse.jetty.io.ssl.SslConnection.LEVEL=DEBUG

View File

@ -187,7 +187,7 @@ public class LocalConnector extends AbstractConnector
public LocalEndPoint()
{
super(getScheduler(), LocalConnector.this.getIdleTimeout());
super(LocalConnector.this.getScheduler(), LocalConnector.this.getIdleTimeout());
setGrowOutput(true);
}