Merged branch 'jetty-9.3.x' into 'master'.

This commit is contained in:
Simone Bordet 2016-02-29 11:33:25 +01:00
commit bb08b65d15
2 changed files with 47 additions and 62 deletions

View File

@ -46,9 +46,9 @@ public class HTTP2Flusher extends IteratingCallback
private final Map<IStream, Integer> streams = new HashMap<>(); private final Map<IStream, Integer> streams = new HashMap<>();
private final List<Entry> resets = new ArrayList<>(); private final List<Entry> resets = new ArrayList<>();
private final List<Entry> actives = new ArrayList<>(); private final List<Entry> actives = new ArrayList<>();
private final Queue<Entry> completes = new ArrayDeque<>();
private final HTTP2Session session; private final HTTP2Session session;
private final ByteBufferPool.Lease lease; private final ByteBufferPool.Lease lease;
private boolean terminated;
public HTTP2Flusher(HTTP2Session session) public HTTP2Flusher(HTTP2Session session)
{ {
@ -58,57 +58,52 @@ public class HTTP2Flusher extends IteratingCallback
public void window(IStream stream, WindowUpdateFrame frame) public void window(IStream stream, WindowUpdateFrame frame)
{ {
boolean added = false; boolean closed;
synchronized (this) synchronized (this)
{ {
if (!isClosed()) closed = terminated;
added = windows.offer(new WindowEntry(stream, frame)); if (!closed)
windows.offer(new WindowEntry(stream, frame));
} }
// Flush stalled data. // Flush stalled data.
if (added) if (!closed)
iterate(); iterate();
} }
public boolean prepend(Entry entry) public boolean prepend(Entry entry)
{ {
boolean fail = false; boolean closed;
synchronized (this) synchronized (this)
{ {
if (isClosed()) closed = terminated;
{ if (!closed)
fail = true;
}
else
{ {
frames.add(0, entry); frames.add(0, entry);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Prepended {}, frames={}", entry, frames.size()); LOG.debug("Prepended {}, frames={}", entry, frames.size());
} }
} }
if (fail) if (closed)
closed(entry, new ClosedChannelException()); closed(entry, new ClosedChannelException());
return !fail; return !closed;
} }
public boolean append(Entry entry) public boolean append(Entry entry)
{ {
boolean fail = false; boolean closed;
synchronized (this) synchronized (this)
{ {
if (isClosed()) closed = terminated;
{ if (!closed)
fail = true;
}
else
{ {
frames.offer(entry); frames.offer(entry);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Appended {}, frames={}", entry, frames.size()); LOG.debug("Appended {}, frames={}", entry, frames.size());
} }
} }
if (fail) if (closed)
closed(entry, new ClosedChannelException()); closed(entry, new ClosedChannelException());
return !fail; return !closed;
} }
private Entry remove(int index) private Entry remove(int index)
@ -138,6 +133,9 @@ public class HTTP2Flusher extends IteratingCallback
synchronized (this) synchronized (this)
{ {
if (terminated)
throw new ClosedChannelException();
// First thing, update the window sizes, so we can // First thing, update the window sizes, so we can
// reason about the frames to remove from the queue. // reason about the frames to remove from the queue.
while (!windows.isEmpty()) while (!windows.isEmpty())
@ -226,12 +224,8 @@ public class HTTP2Flusher extends IteratingCallback
if (actives.isEmpty()) if (actives.isEmpty())
{ {
if (isClosed())
fail(new ClosedChannelException(), true);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Flushed {}", session); LOG.debug("Flushed {}", session);
return Action.IDLE; return Action.IDLE;
} }
@ -259,20 +253,11 @@ public class HTTP2Flusher extends IteratingCallback
{ {
lease.recycle(); lease.recycle();
// Transfer active items to avoid reentrancy.
for (int i = 0; i < actives.size(); ++i)
completes.add(actives.get(i));
actives.clear();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Written {} frames for {}", completes.size(), completes); LOG.debug("Written {} frames for {}", actives.size(), actives);
// Drain the frames one by one to avoid reentrancy. actives.forEach(Entry::succeeded);
while (!completes.isEmpty()) actives.clear();
{
Entry entry = completes.poll();
entry.succeeded();
}
super.succeeded(); super.succeeded();
} }
@ -288,40 +273,40 @@ public class HTTP2Flusher extends IteratingCallback
{ {
lease.recycle(); lease.recycle();
// Transfer active items to avoid reentrancy. boolean closed;
for (int i = 0; i < actives.size(); ++i)
completes.add(actives.get(i));
actives.clear();
// Drain the frames one by one to avoid reentrancy.
while (!completes.isEmpty())
{
Entry entry = completes.poll();
entry.failed(x);
}
fail(x, isClosed());
}
private void fail(Throwable x, boolean closed)
{
Queue<Entry> queued;
synchronized (this) synchronized (this)
{ {
queued = new ArrayDeque<>(frames); closed = terminated;
terminated = true;
if (LOG.isDebugEnabled())
LOG.debug("{}, active/queued={}/{}", closed ? "Closing" : "Failing", actives.size(), frames.size());
actives.addAll(frames);
frames.clear(); frames.clear();
} }
if (LOG.isDebugEnabled()) actives.forEach(entry -> entry.failed(x));
LOG.debug("{}, queued={}", closed ? "Closing" : "Failing", queued.size()); actives.clear();
for (Entry entry : queued)
entry.failed(x);
// If the failure came from within the
// flusher, we need to close the connection.
if (!closed) if (!closed)
session.abort(x); session.abort(x);
} }
void terminate()
{
boolean closed;
synchronized (this)
{
closed = terminated;
terminated = true;
if (LOG.isDebugEnabled())
LOG.debug("{}", closed ? "Terminated" : "Terminating");
}
if (!closed)
iterate();
}
private void closed(Entry entry, Throwable failure) private void closed(Entry entry, Throwable failure)
{ {
entry.failed(failure); entry.failed(failure);

View File

@ -907,7 +907,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{ {
if (closed.compareAndSet(current, CloseState.CLOSED)) if (closed.compareAndSet(current, CloseState.CLOSED))
{ {
flusher.close(); flusher.terminate();
for (IStream stream : streams.values()) for (IStream stream : streams.values())
stream.close(); stream.close();
streams.clear(); streams.clear();