* Fixes #11763 - Race condition in QoSHandler. Now using a read-write lock to atomically execute expire(). This guarantees that there are no races with resume(). The concurrency between handle() and resume(), which should be the most common case, is handled by atomic data structures. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
33bc4f7376
commit
a9b2da533f
|
@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.io.CyclicTimeouts;
|
||||
|
@ -74,6 +76,7 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
private static final Logger LOG = LoggerFactory.getLogger(QoSHandler.class);
|
||||
private static final String EXPIRED_ATTRIBUTE_NAME = QoSHandler.class.getName() + ".expired";
|
||||
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final AtomicInteger state = new AtomicInteger();
|
||||
private final Map<Integer, Queue<Entry>> queues = new ConcurrentHashMap<>();
|
||||
private final Set<Integer> priorities = new ConcurrentSkipListSet<>(Comparator.reverseOrder());
|
||||
|
@ -182,34 +185,54 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
@Override
|
||||
public boolean onConditionsMet(Request request, Response response, Callback callback) throws Exception
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} handling {}", this, request);
|
||||
return process(request, response, callback);
|
||||
}
|
||||
|
||||
int permits = state.getAndDecrement();
|
||||
if (permits > 0)
|
||||
private boolean process(Request request, Response response, Callback callback) throws Exception
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} processing {}", this, request);
|
||||
|
||||
boolean expired = false;
|
||||
|
||||
// The read lock allows concurrency with resume(),
|
||||
// which is the common case, but not with expire().
|
||||
lock.readLock().lock();
|
||||
try
|
||||
{
|
||||
int permits = state.decrementAndGet();
|
||||
if (permits < 0)
|
||||
{
|
||||
if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) == null)
|
||||
{
|
||||
// Cover this race condition:
|
||||
// T1 in this method may find no permits, so it will suspend the request.
|
||||
// T2 in resume() finds no suspended request yet and increments the permits.
|
||||
// T1 suspends the request, despite permits are available.
|
||||
// This is avoided in resume() using a spin loop to wait for the request to be suspended.
|
||||
// See correspondent state machine logic in resume() and expire().
|
||||
suspend(request, response, callback);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// This is a request that was suspended, it expired, and was re-handled.
|
||||
// Do not suspend it again, just complete it with 503 unavailable.
|
||||
state.incrementAndGet();
|
||||
expired = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (!expired)
|
||||
return handleWithPermit(request, response, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (request.getAttribute(EXPIRED_ATTRIBUTE_NAME) != null)
|
||||
{
|
||||
// This is a request that was suspended, and it expired.
|
||||
// Do not suspend it again, just complete it with 503.
|
||||
state.getAndIncrement();
|
||||
notAvailable(response, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Avoid this race condition:
|
||||
// T1 in handle() may find no permits, so it will suspend the request.
|
||||
// T2 in resume() finds no suspended requests and increments the permits.
|
||||
// T1 suspends the request, which will remain suspended despite permits are available.
|
||||
// See correspondent state machine logic in resume() and expire().
|
||||
suspend(request, response, callback);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
notAvailable(response, callback);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -287,24 +310,33 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
|
||||
private void resume(Throwable x)
|
||||
{
|
||||
// See correspondent state machine logic in handle() and expire().
|
||||
int permits = state.getAndIncrement();
|
||||
if (permits >= 0)
|
||||
// Allows concurrency with process(), but not with expire().
|
||||
lock.readLock().lock();
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} no suspended requests to resume", this, x);
|
||||
return;
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (resumeSuspended())
|
||||
// See correspondent state machine logic in process() and expire().
|
||||
int permits = state.incrementAndGet();
|
||||
if (permits > 0)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} no suspended requests to resume", this, x);
|
||||
return;
|
||||
}
|
||||
|
||||
// Found no suspended requests yet, but there will be.
|
||||
// This covers the small race window in handle(), where
|
||||
// the state is updated and then the request suspended.
|
||||
Thread.onSpinWait();
|
||||
while (true)
|
||||
{
|
||||
if (resumeSuspended())
|
||||
return;
|
||||
|
||||
// Found no suspended requests yet, but there will be.
|
||||
// This covers the small race window in process(), where
|
||||
// the state is updated and then the request suspended.
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,17 +389,32 @@ public class QoSHandler extends ConditionalHandler.Abstract
|
|||
|
||||
private void expire()
|
||||
{
|
||||
// The request timed out, therefore it never acquired a permit.
|
||||
boolean removed = queues.get(priority).remove(this);
|
||||
if (removed)
|
||||
boolean removed;
|
||||
// It should be rare that requests expire.
|
||||
// Grab the write lock to atomically operate on the queue and
|
||||
// the state, avoiding concurrency with process() and resume().
|
||||
lock.writeLock().lock();
|
||||
try
|
||||
{
|
||||
// See correspondent state machine logic in handle() and resume().
|
||||
state.getAndIncrement();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} timeout {}", QoSHandler.this, request);
|
||||
request.setAttribute(EXPIRED_ATTRIBUTE_NAME, true);
|
||||
failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException());
|
||||
// The request timed out, therefore it was not handled.
|
||||
removed = queues.get(priority).remove(this);
|
||||
// The remove() may fail to a concurrent resume().
|
||||
if (removed)
|
||||
{
|
||||
// See correspondent state machine logic in process() and resume().
|
||||
state.incrementAndGet();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} timeout {}", QoSHandler.this, request);
|
||||
request.setAttribute(EXPIRED_ATTRIBUTE_NAME, true);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
if (removed)
|
||||
failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue