From aadfae936c5fb3b668af219ef57ae43fd33545e2 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 29 Feb 2016 11:32:19 +0100 Subject: [PATCH] Issue #366 (Avoid HTTP2Flusher reentrancy). Made HTTP2Flusher termination to be atomic and non-reentrant. --- .../org/eclipse/jetty/http2/HTTP2Flusher.java | 107 ++++++++---------- .../org/eclipse/jetty/http2/HTTP2Session.java | 2 +- 2 files changed, 47 insertions(+), 62 deletions(-) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index 10b4e3fd58b..9ddd316d8b4 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -46,9 +46,9 @@ public class HTTP2Flusher extends IteratingCallback private final Map streams = new HashMap<>(); private final List resets = new ArrayList<>(); private final List actives = new ArrayList<>(); - private final Queue completes = new ArrayDeque<>(); private final HTTP2Session session; private final ByteBufferPool.Lease lease; + private boolean terminated; public HTTP2Flusher(HTTP2Session session) { @@ -58,57 +58,52 @@ public class HTTP2Flusher extends IteratingCallback public void window(IStream stream, WindowUpdateFrame frame) { - boolean added = false; + boolean closed; synchronized (this) { - if (!isClosed()) - added = windows.offer(new WindowEntry(stream, frame)); + closed = terminated; + if (!closed) + windows.offer(new WindowEntry(stream, frame)); } // Flush stalled data. - if (added) + if (!closed) iterate(); } public boolean prepend(Entry entry) { - boolean fail = false; + boolean closed; synchronized (this) { - if (isClosed()) - { - fail = true; - } - else + closed = terminated; + if (!closed) { frames.add(0, entry); if (LOG.isDebugEnabled()) LOG.debug("Prepended {}, frames={}", entry, frames.size()); } } - if (fail) + if (closed) closed(entry, new ClosedChannelException()); - return !fail; + return !closed; } public boolean append(Entry entry) { - boolean fail = false; + boolean closed; synchronized (this) { - if (isClosed()) - { - fail = true; - } - else + closed = terminated; + if (!closed) { frames.offer(entry); if (LOG.isDebugEnabled()) LOG.debug("Appended {}, frames={}", entry, frames.size()); } } - if (fail) + if (closed) closed(entry, new ClosedChannelException()); - return !fail; + return !closed; } private Entry remove(int index) @@ -138,6 +133,9 @@ public class HTTP2Flusher extends IteratingCallback synchronized (this) { + if (terminated) + throw new ClosedChannelException(); + // First thing, update the window sizes, so we can // reason about the frames to remove from the queue. while (!windows.isEmpty()) @@ -226,12 +224,8 @@ public class HTTP2Flusher extends IteratingCallback if (actives.isEmpty()) { - if (isClosed()) - fail(new ClosedChannelException(), true); - if (LOG.isDebugEnabled()) LOG.debug("Flushed {}", session); - return Action.IDLE; } @@ -259,20 +253,11 @@ public class HTTP2Flusher extends IteratingCallback { lease.recycle(); - // Transfer active items to avoid reentrancy. - for (int i = 0; i < actives.size(); ++i) - completes.add(actives.get(i)); - actives.clear(); - if (LOG.isDebugEnabled()) - LOG.debug("Written {} frames for {}", completes.size(), completes); + LOG.debug("Written {} frames for {}", actives.size(), actives); - // Drain the frames one by one to avoid reentrancy. - while (!completes.isEmpty()) - { - Entry entry = completes.poll(); - entry.succeeded(); - } + actives.forEach(Entry::succeeded); + actives.clear(); super.succeeded(); } @@ -288,40 +273,40 @@ public class HTTP2Flusher extends IteratingCallback { lease.recycle(); - // Transfer active items to avoid reentrancy. - for (int i = 0; i < actives.size(); ++i) - completes.add(actives.get(i)); - actives.clear(); - - // Drain the frames one by one to avoid reentrancy. - while (!completes.isEmpty()) - { - Entry entry = completes.poll(); - entry.failed(x); - } - - fail(x, isClosed()); - } - - private void fail(Throwable x, boolean closed) - { - Queue queued; + boolean closed; synchronized (this) { - queued = new ArrayDeque<>(frames); + closed = terminated; + terminated = true; + if (LOG.isDebugEnabled()) + LOG.debug("{}, active/queued={}/{}", closed ? "Closing" : "Failing", actives.size(), frames.size()); + actives.addAll(frames); frames.clear(); } - if (LOG.isDebugEnabled()) - LOG.debug("{}, queued={}", closed ? "Closing" : "Failing", queued.size()); - - for (Entry entry : queued) - entry.failed(x); + actives.forEach(entry -> entry.failed(x)); + actives.clear(); + // If the failure came from within the + // flusher, we need to close the connection. if (!closed) session.abort(x); } + void terminate() + { + boolean closed; + synchronized (this) + { + closed = terminated; + terminated = true; + if (LOG.isDebugEnabled()) + LOG.debug("{}", closed ? "Terminated" : "Terminating"); + } + if (!closed) + iterate(); + } + private void closed(Entry entry, Throwable failure) { entry.failed(failure); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 7a5523a9ee4..308e374e367 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -907,7 +907,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { if (closed.compareAndSet(current, CloseState.CLOSED)) { - flusher.close(); + flusher.terminate(); for (IStream stream : streams.values()) stream.close(); streams.clear();