jetty-9 protection from double onFillable calls

This commit is contained in:
Greg Wilkins 2012-09-09 08:31:31 +10:00
parent 2bb425d0b2
commit a15d71932f
3 changed files with 76 additions and 16 deletions

View File

@ -20,7 +20,7 @@ package org.eclipse.jetty.io;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExecutorCallback;
@ -38,16 +38,18 @@ public abstract class AbstractConnection implements Connection
{
private static final Logger LOG = Log.getLogger(AbstractConnection.class);
private final AtomicBoolean _readInterested = new AtomicBoolean();
private final EndPoint _endPoint;
private final Executor _executor;
private final Callback<Void> _readCallback;
private enum State {IDLE,INTERESTED,FILLING,FILLING_INTERESTED};
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
public AbstractConnection(EndPoint endp, Executor executor)
{
this(endp, executor, true);
}
public AbstractConnection(EndPoint endp, Executor executor, final boolean dispatchCompletion)
{
if (executor == null)
@ -59,8 +61,41 @@ public abstract class AbstractConnection implements Connection
@Override
protected void onCompleted(Void context)
{
if (_readInterested.compareAndSet(true, false))
onFillable();
if (_state.compareAndSet(State.INTERESTED,State.FILLING))
{
try
{
onFillable();
}
finally
{
loop:while(true)
{
switch(_state.get())
{
case IDLE:
case INTERESTED:
throw new IllegalStateException();
case FILLING:
if (_state.compareAndSet(State.FILLING,State.IDLE))
break loop;
break;
case FILLING_INTERESTED:
if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
{
getEndPoint().fillInterested(null, _readCallback);
break loop;
}
break;
}
}
}
}
else
LOG.warn(new Throwable());
}
@Override
@ -97,8 +132,30 @@ public abstract class AbstractConnection implements Connection
public void fillInterested()
{
LOG.debug("fillInterested {}",this);
if (_readInterested.compareAndSet(false, true))
getEndPoint().fillInterested(null, _readCallback);
loop:while(true)
{
switch(_state.get())
{
case IDLE:
if (_state.compareAndSet(State.IDLE,State.INTERESTED))
{
getEndPoint().fillInterested(null, _readCallback);
break loop;
}
break;
case FILLING:
if (_state.compareAndSet(State.FILLING,State.FILLING_INTERESTED))
break loop;
break;
case FILLING_INTERESTED:
case INTERESTED:
break loop;
}
}
}
/**
@ -165,6 +222,6 @@ public abstract class AbstractConnection implements Connection
@Override
public String toString()
{
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _readInterested.get() ? "R" : "");
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
}
}

View File

@ -105,13 +105,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public void reset()
{
releaseRequestBuffer();
if (_chunk!=null)
{
_bufferPool.release(_chunk);
_chunk=null;
}
// If we are still expecting
if (_channel.isExpecting100Continue())
{
@ -130,6 +123,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_generator.reset();
_channel.reset();
releaseRequestBuffer();
if (_chunk!=null)
{
_bufferPool.release(_chunk);
_chunk=null;
}
}
@Override
@ -178,6 +178,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_requestBuffer = _bufferPool.acquire(_configuration.getRequestHeaderSize(), false);
int filled = getEndPoint().fill(_requestBuffer);
if (filled==0) // Do a retry on fill 0 (optimisation for SSL connections)
filled = getEndPoint().fill(_requestBuffer);
LOG.debug("{} filled {}", this, filled);

View File

@ -182,7 +182,7 @@ public class SSLEngineTest
server.setHandler(new HelloWorldHandler());
server.start();
final int loops=20;
final int loops=10;
final int numConns=20;
Socket[] client=new Socket[numConns];