From f81bf7f945f578579a70017714d101f019c114fb Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 10 Sep 2020 10:13:14 +0200 Subject: [PATCH] Fixes #3766 - Introduce HTTP/2 API to batch frames. (#5222) * Fixes #3766 - Introduce HTTP/2 API to batch frames. Introduced Stream.FrameList to hold HEADERS+DATA+HEADERS frames. These are often used by the client and by the server when the request/response content is known and FrameList will allow to send them in a single TCP write, rather than multiple ones. Rewritten HttpSenderOverHTTP2.sendHeaders() and HttpTransportOverHTTP2.sendHeaders() to take advantage of FrameList. Now using ConcurrentHashMap as a client context, because with DEBUG logging enabled it may be access concurrently. Signed-off-by: Simone Bordet --- .../org/eclipse/jetty/client/HttpClient.java | 4 +- .../jetty/http2/client/HTTP2Client.java | 4 +- .../client/HTTP2ClientConnectionFactory.java | 5 +- .../jetty/http2/client/StreamResetTest.java | 7 +- .../http2/BufferingFlowControlStrategy.java | 4 +- .../org/eclipse/jetty/http2/HTTP2Flusher.java | 24 ++ .../org/eclipse/jetty/http2/HTTP2Session.java | 120 +++++---- .../org/eclipse/jetty/http2/HTTP2Stream.java | 13 +- .../org/eclipse/jetty/http2/ISession.java | 22 +- .../java/org/eclipse/jetty/http2/IStream.java | 75 ++++++ .../http2/SimpleFlowControlStrategy.java | 12 +- .../org/eclipse/jetty/http2/api/Stream.java | 4 +- .../eclipse/jetty/http2/frames/DataFrame.java | 24 +- .../jetty/http2/frames/HeadersFrame.java | 21 +- .../jetty/http2/frames/PriorityFrame.java | 21 +- .../jetty/http2/frames/PushPromiseFrame.java | 19 +- .../jetty/http2/frames/StreamFrame.java | 37 +++ .../client/http/HttpSenderOverHTTP2.java | 106 ++++---- .../http2/server/HTTP2ServerSession.java | 5 +- .../http2/server/HttpTransportOverHTTP2.java | 254 +++++++++--------- 20 files changed, 491 insertions(+), 290 deletions(-) create mode 100644 jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/StreamFrame.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index 942bf7a683e..c9ce6f258ed 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -30,7 +30,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -603,7 +602,8 @@ public class HttpClient extends ContainerLifeCycle @Override public void succeeded(List socketAddresses) { - Map context = new HashMap<>(); + // Multiple threads may access the map, especially with DEBUG logging enabled. + Map context = new ConcurrentHashMap<>(); context.put(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, HttpClient.this); context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination); connect(socketAddresses, 0, context); diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java index 98a0d693ff9..351b1b2d9c2 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java @@ -25,9 +25,9 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory; @@ -433,7 +433,7 @@ public class HTTP2Client extends ContainerLifeCycle private Map contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise promise, Map context) { if (context == null) - context = new HashMap<>(); + context = new ConcurrentHashMap<>(); context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this); context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener); context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise); diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 217f9c64387..870788cb399 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.http2.client; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; @@ -127,11 +128,11 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory if (windowDelta > 0) { session.updateRecvWindow(windowDelta); - session.frames(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)); + session.frames(null, Arrays.asList(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this); } else { - session.frames(null, this, prefaceFrame, settingsFrame); + session.frames(null, Arrays.asList(prefaceFrame, settingsFrame), this); } } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index 30a19b3ff0e..b554dcdc72f 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -27,6 +27,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.List; @@ -63,6 +64,7 @@ import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PrefaceFrame; import org.eclipse.jetty.http2.frames.ResetFrame; @@ -198,14 +200,15 @@ public class StreamResetTest extends AbstractTest { // Simulate that there is pending data to send. IStream stream = (IStream)s; - stream.getSession().frames(stream, new Callback() + List frames = Collections.singletonList(new DataFrame(s.getId(), ByteBuffer.allocate(16), true)); + stream.getSession().frames(stream, frames, new Callback() { @Override public void failed(Throwable x) { serverResetLatch.countDown(); } - }, new DataFrame(s.getId(), ByteBuffer.allocate(16), true)); + }); } }; } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java index ded425c87c9..09dd3c3c3ac 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java @@ -18,11 +18,11 @@ package org.eclipse.jetty.http2; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.util.Atomics; import org.eclipse.jetty.util.Callback; @@ -160,7 +160,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame) { - session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY); + session.frames(stream, Collections.singletonList(frame), Callback.NOOP); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index dcdf8666c2b..b228e4a340a 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -113,6 +113,25 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable return false; } + public boolean append(List list) + { + Throwable closed; + synchronized (this) + { + closed = terminated; + if (closed == null) + { + list.forEach(entries::offer); + if (LOG.isDebugEnabled()) + LOG.debug("Appended {}, entries={}", list, entries.size()); + } + } + if (closed == null) + return true; + list.forEach(entry -> closed(entry, closed)); + return false; + } + private int getWindowQueueSize() { synchronized (this) @@ -414,6 +433,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable public abstract long onFlushed(long bytes) throws IOException; + boolean hasHighPriority() + { + return false; + } + @Override public void failed(Throwable x) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 6fa8194a30c..cc3d724e950 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -23,8 +23,10 @@ import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; @@ -34,6 +36,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; @@ -49,6 +52,7 @@ import org.eclipse.jetty.http2.frames.PriorityFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.frames.StreamFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.hpack.HpackException; @@ -585,7 +589,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) { - streamCreator.newStream(frame, promise, listener); + newStream(new IStream.FrameList(frame), promise, listener); + } + + @Override + public void newStream(IStream.FrameList frames, Promise promise, Stream.Listener listener) + { + streamCreator.newStream(frames, promise, listener); } @Override @@ -692,46 +702,48 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private void control(IStream stream, Callback callback, Frame frame) { - frames(stream, callback, frame, Frame.EMPTY_ARRAY); + frames(stream, Collections.singletonList(frame), callback); } @Override - public void frames(IStream stream, Callback callback, Frame frame, Frame... frames) + public void frames(IStream stream, List frames, Callback callback) { // We want to generate as late as possible to allow re-prioritization; // generation will happen while processing the entries. // The callback needs to be notified only when the last frame completes. - int length = frames.length; - if (length == 0) + int count = frames.size(); + if (count > 1) + callback = new CountingCallback(callback, count); + for (int i = 1; i <= count; ++i) { - frame(new ControlEntry(frame, stream, callback), true); - } - else - { - callback = new CountingCallback(callback, 1 + length); - frame(new ControlEntry(frame, stream, callback), false); - for (int i = 1; i <= length; ++i) - { - frame(new ControlEntry(frames[i - 1], stream, callback), i == length); - } + Frame frame = frames.get(i - 1); + HTTP2Flusher.Entry entry = newEntry(frame, stream, callback); + frame(entry, i == count); } } + private HTTP2Flusher.Entry newEntry(Frame frame, IStream stream, Callback callback) + { + return frame.getType() == FrameType.DATA + ? new DataEntry((DataFrame)frame, stream, callback) + : new ControlEntry(frame, stream, callback); + } + @Override public void data(IStream stream, Callback callback, DataFrame frame) { // We want to generate as late as possible to allow re-prioritization. - frame(new DataEntry(frame, stream, callback), true); + frame(newEntry(frame, stream, callback), true); } private void frame(HTTP2Flusher.Entry entry, boolean flush) { if (LOG.isDebugEnabled()) - LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this); + LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry, this); // Ping frames are prepended to process them as soon as possible. - boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry); + boolean queued = entry.hasHighPriority() ? flusher.prepend(entry) : flusher.append(entry); if (queued && flush) { if (entry.stream != null) @@ -1278,6 +1290,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } + @Override + boolean hasHighPriority() + { + return frame.getType() == FrameType.PING; + } + @Override public void succeeded() { @@ -1613,7 +1631,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private void complete() { - frames(null, Callback.NOOP, newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()); + frames(null, Arrays.asList(newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()), Callback.NOOP); } } @@ -1672,29 +1690,30 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio int streamId = reserveSlot(slot, currentStreamId); if (currentStreamId <= 0) - frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive()); + frame = frame.withStreamId(streamId); - slot.entry = new ControlEntry(frame, null, callback); + slot.entries = Collections.singletonList(newEntry(frame, null, callback)); flush(); return streamId; } - private void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) + private void newStream(IStream.FrameList frameList, Promise promise, Stream.Listener listener) { Slot slot = new Slot(); - int currentStreamId = frame.getStreamId(); + int currentStreamId = frameList.getStreamId(); int streamId = reserveSlot(slot, currentStreamId); + List frames = frameList.getFrames(); if (currentStreamId <= 0) { - PriorityFrame priority = frame.getPriority(); - priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive()); - frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); + frames = frames.stream() + .map(frame -> frame.withStreamId(streamId)) + .collect(Collectors.toList()); } try { - createLocalStream(slot, frame, promise, listener, streamId); + createLocalStream(slot, frames, promise, listener, streamId); } catch (Throwable x) { @@ -1706,11 +1725,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { Slot slot = new Slot(); int streamId = reserveSlot(slot, 0); - frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); + frame = frame.withStreamId(streamId); try { - createLocalStream(slot, frame, promise, listener, streamId); + createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId); } catch (Throwable x) { @@ -1738,11 +1757,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio return streamId; } - private void createLocalStream(Slot slot, Frame frame, Promise promise, Stream.Listener listener, int streamId) + private void createLocalStream(Slot slot, List frames, Promise promise, Stream.Listener listener, int streamId) { IStream stream = HTTP2Session.this.createLocalStream(streamId); stream.setListener(listener); - slot.entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)); + int count = frames.size(); + Callback streamCallback = new StreamPromiseCallback(promise, stream); + Callback callback = count == 1 ? streamCallback : new CountingCallback(streamCallback, count); + slot.entries = frames.stream() + .map(frame -> newEntry(frame, stream, callback)) + .collect(Collectors.toList()); flush(); } @@ -1757,16 +1781,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } /** - * Flush goes over the entries of the slots queue to flush the entries, - * until either one of the following two conditions is true: - * - The queue is empty. - * - It reaches a slot with a null entry. - * When a slot with a null entry is encountered, this means a concurrent thread reserved a slot - * but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering - * the null entry must bail out and it is up to the concurrent thread to finish up flushing. - * Note that only one thread can flush at any one time, if two threads happen to call flush - * concurrently, one will do the work while the other will bail out, so it is safe that all - * threads call flush after they're done reserving a slot and setting the entry. + *

Iterates over the entries of the slot queue to flush them.

+ *

The flush proceeds until either one of the following two conditions is true:

+ *
    + *
  • the queue is empty
  • + *
  • a slot with a no entries is encountered
  • + *
+ *

When a slot with a no entries is encountered, then it means that a concurrent thread reserved + * a slot but hasn't set its entries yet. Since slots must be flushed in order, the thread encountering + * the slot with no entries must bail out and it is up to the concurrent thread to finish up flushing.

+ *

Note that only one thread can flush at any time; if two threads happen to call this method + * concurrently, one will do the work while the other will bail out, so it is safe that all + * threads call this method after they are done reserving a slot and setting the entries.

*/ private void flush() { @@ -1774,7 +1800,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio boolean queued = false; while (true) { - ControlEntry entry; + List entries; synchronized (this) { if (flushing == null) @@ -1783,17 +1809,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio return; // another thread is flushing Slot slot = slots.peek(); - entry = slot == null ? null : slot.entry; + entries = slot == null ? null : slot.entries; - if (entry == null) + if (entries == null) { flushing = null; - break; // No more slots or null entry, so we may iterate on the flusher + break; // No more slots or null entries, so we may iterate on the flusher } slots.poll(); } - queued |= flusher.append(entry); + queued |= flusher.append(entries); } if (queued) flusher.iterate(); @@ -1801,7 +1827,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private class Slot { - private volatile ControlEntry entry; + private volatile List entries; } } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 0142a8515ad..e427057819b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.http2; import java.io.EOFException; import java.io.IOException; import java.nio.channels.WritePendingException; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -108,10 +109,16 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa } @Override - public void headers(HeadersFrame frame, Callback callback) + public void send(FrameList frameList, Callback callback) { if (startWrite(callback)) - session.frames(this, this, frame, Frame.EMPTY_ARRAY); + session.frames(this, frameList.getFrames(), this); + } + + @Override + public void headers(HeadersFrame frame, Callback callback) + { + send(new FrameList(frame), callback); } @Override @@ -137,7 +144,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa localReset = true; failure = new EOFException("reset"); } - session.frames(this, callback, frame, Frame.EMPTY_ARRAY); + session.frames(this, Collections.singletonList(frame), callback); } private boolean startWrite(Callback callback) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java index bb4c751df96..f807128d866 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.http2; import java.io.IOException; +import java.util.List; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; @@ -47,18 +48,25 @@ public interface ISession extends Session void removeStream(IStream stream); /** - *

Enqueues the given frames to be written to the connection.

+ *

Sends the given list of frames to create a new {@link Stream}.

* - * @param stream the stream the frames belong to - * @param callback the callback that gets notified when the frames have been sent - * @param frame the first frame to enqueue - * @param frames additional frames to enqueue + * @param frames the list of frames to send + * @param promise the promise that gets notified of the stream creation + * @param listener the listener that gets notified of stream events */ - void frames(IStream stream, Callback callback, Frame frame, Frame... frames); + void newStream(IStream.FrameList frames, Promise promise, Stream.Listener listener); + + /** + *

Enqueues the given frames to be written to the connection.

+ * @param stream the stream the frames belong to + * @param frames additional frames to enqueue + * @param callback the callback that gets notified when the frames have been sent + */ + void frames(IStream stream, List frames, Callback callback); /** *

Enqueues the given PUSH_PROMISE frame to be written to the connection.

- *

Differently from {@link #frames(IStream, Callback, Frame, Frame...)}, this method + *

Differently from {@link #frames(IStream, List, Callback)}, this method * generates atomically the stream id for the pushed stream.

* * @param stream the stream associated to the pushed stream diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 52db438d073..4b94b568cd7 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -19,9 +19,16 @@ package org.eclipse.jetty.http2; import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.StreamFrame; import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Callback; @@ -52,6 +59,15 @@ public interface IStream extends Stream, Attachable, Closeable */ void setListener(Listener listener); + /** + *

Sends the given list of frames.

+ *

Typically used to send an HTTP response along with content and possibly trailers.

+ * + * @param frameList the list of frames to send + * @param callback the callback that gets notified when the frames have been sent + */ + void send(FrameList frameList, Callback callback); + /** *

Processes the given {@code frame}, belonging to this stream.

* @@ -109,4 +125,63 @@ public interface IStream extends Stream, Attachable, Closeable * @see Listener#onFailure(Stream, int, String, Throwable, Callback) */ boolean isResetOrFailed(); + + /** + *

An ordered list of frames belonging to the same stream.

+ */ + public static class FrameList + { + private final List frames; + + /** + *

Creates a frame list of just the given HEADERS frame.

+ * + * @param headers the HEADERS frame + */ + public FrameList(HeadersFrame headers) + { + Objects.requireNonNull(headers); + this.frames = Collections.singletonList(headers); + } + + /** + *

Creates a frame list of the given frames.

+ * + * @param headers the HEADERS frame for the headers + * @param data the DATA frame for the content, or null if there is no content + * @param trailers the HEADERS frame for the trailers, or null if there are no trailers + */ + public FrameList(HeadersFrame headers, DataFrame data, HeadersFrame trailers) + { + Objects.requireNonNull(headers); + ArrayList frames = new ArrayList<>(3); + int streamId = headers.getStreamId(); + if (data != null && data.getStreamId() != streamId) + throw new IllegalArgumentException("Invalid stream ID for DATA frame " + data); + if (trailers != null && trailers.getStreamId() != streamId) + throw new IllegalArgumentException("Invalid stream ID for HEADERS frame " + trailers); + frames.add(headers); + if (data != null) + frames.add(data); + if (trailers != null) + frames.add(trailers); + this.frames = Collections.unmodifiableList(frames); + } + + /** + * @return the stream ID of the frames in this list + */ + public int getStreamId() + { + return frames.get(0).getStreamId(); + } + + /** + * @return a List of non-null frames + */ + public List getFrames() + { + return frames; + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java index a59f324f014..4a1f5d41af5 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java @@ -18,6 +18,9 @@ package org.eclipse.jetty.http2; +import java.util.ArrayList; +import java.util.List; + import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.util.Callback; @@ -44,12 +47,13 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy // This method is called when a whole flow controlled frame has been consumed. // We send a WindowUpdate every time, even if the frame was very small. + List frames = new ArrayList<>(2); WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length); + frames.add(sessionFrame); session.updateRecvWindow(length); if (LOG.isDebugEnabled()) LOG.debug("Data consumed, increased session recv window by {} for {}", length, session); - Frame[] streamFrame = Frame.EMPTY_ARRAY; if (stream != null) { if (stream.isRemotelyClosed()) @@ -59,14 +63,14 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy } else { - streamFrame = new Frame[1]; - streamFrame[0] = new WindowUpdateFrame(stream.getId(), length); + WindowUpdateFrame streamFrame = new WindowUpdateFrame(stream.getId(), length); + frames.add(streamFrame); stream.updateRecvWindow(length); if (LOG.isDebugEnabled()) LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream); } } - session.frames(stream, Callback.NOOP, sessionFrame, streamFrame); + session.frames(stream, frames, Callback.NOOP); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index 95d353bd54d..3b3db21ee23 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -51,7 +51,9 @@ public interface Stream Session getSession(); /** - *

Sends the given HEADERS {@code frame} representing an HTTP response.

+ *

Sends the given HEADERS {@code frame}.

+ *

Typically used to send an HTTP response with no content and no trailers, + * or to send the HTTP response trailers.

* * @param frame the HEADERS frame to send * @param callback the callback that gets notified when the frame has been sent diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java index f0b2e4d5cc2..c8da64c3845 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java @@ -20,13 +20,17 @@ package org.eclipse.jetty.http2.frames; import java.nio.ByteBuffer; -public class DataFrame extends Frame +public class DataFrame extends StreamFrame { - private final int streamId; private final ByteBuffer data; private final boolean endStream; private final int padding; + public DataFrame(ByteBuffer data, boolean endStream) + { + this(0, data, endStream); + } + public DataFrame(int streamId, ByteBuffer data, boolean endStream) { this(streamId, data, endStream, 0); @@ -34,18 +38,12 @@ public class DataFrame extends Frame public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding) { - super(FrameType.DATA); - this.streamId = streamId; + super(FrameType.DATA, streamId); this.data = data; this.endStream = endStream; this.padding = padding; } - public int getStreamId() - { - return streamId; - } - public ByteBuffer getData() { return data; @@ -72,9 +70,15 @@ public class DataFrame extends Frame return padding; } + @Override + public DataFrame withStreamId(int streamId) + { + return new DataFrame(streamId, getData(), isEndStream()); + } + @Override public String toString() { - return String.format("%s#%d{length:%d,end=%b}", super.toString(), streamId, data.remaining(), endStream); + return String.format("%s#%d{length:%d,end=%b}", super.toString(), getStreamId(), data.remaining(), endStream); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java index e6b1c577c1b..1186917277a 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java @@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames; import org.eclipse.jetty.http.MetaData; -public class HeadersFrame extends Frame +public class HeadersFrame extends StreamFrame { - private final int streamId; private final MetaData metaData; private final PriorityFrame priority; private final boolean endStream; @@ -53,18 +52,12 @@ public class HeadersFrame extends Frame */ public HeadersFrame(int streamId, MetaData metaData, PriorityFrame priority, boolean endStream) { - super(FrameType.HEADERS); - this.streamId = streamId; + super(FrameType.HEADERS, streamId); this.metaData = metaData; this.priority = priority; this.endStream = endStream; } - public int getStreamId() - { - return streamId; - } - public MetaData getMetaData() { return metaData; @@ -80,10 +73,18 @@ public class HeadersFrame extends Frame return endStream; } + @Override + public HeadersFrame withStreamId(int streamId) + { + PriorityFrame priority = getPriority(); + priority = priority == null ? null : priority.withStreamId(streamId); + return new HeadersFrame(streamId, getMetaData(), priority, isEndStream()); + } + @Override public String toString() { - return String.format("%s#%d{end=%b}%s", super.toString(), streamId, endStream, + return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream, priority == null ? "" : String.format("+%s", priority)); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java index 61d5d2f065e..b6f05a88f69 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java @@ -18,11 +18,10 @@ package org.eclipse.jetty.http2.frames; -public class PriorityFrame extends Frame +public class PriorityFrame extends StreamFrame { public static final int PRIORITY_LENGTH = 5; - private final int streamId; private final int parentStreamId; private final int weight; private final boolean exclusive; @@ -34,20 +33,14 @@ public class PriorityFrame extends Frame public PriorityFrame(int streamId, int parentStreamId, int weight, boolean exclusive) { - super(FrameType.PRIORITY); - this.streamId = streamId; + super(FrameType.PRIORITY, streamId); this.parentStreamId = parentStreamId; this.weight = weight; this.exclusive = exclusive; } - public int getStreamId() - { - return streamId; - } - /** - * @return int of the Parent Stream + * @return {@code int} of the Parent Stream * @deprecated use {@link #getParentStreamId()} instead. */ @Deprecated @@ -71,9 +64,15 @@ public class PriorityFrame extends Frame return exclusive; } + @Override + public PriorityFrame withStreamId(int streamId) + { + return new PriorityFrame(streamId, getParentStreamId(), getWeight(), isExclusive()); + } + @Override public String toString() { - return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), streamId, parentStreamId, weight, exclusive); + return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), getStreamId(), parentStreamId, weight, exclusive); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java index 1099b55fd08..663d9be4d71 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java @@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames; import org.eclipse.jetty.http.MetaData; -public class PushPromiseFrame extends Frame +public class PushPromiseFrame extends StreamFrame { - private final int streamId; private final int promisedStreamId; private final MetaData metaData; @@ -33,17 +32,11 @@ public class PushPromiseFrame extends Frame public PushPromiseFrame(int streamId, int promisedStreamId, MetaData metaData) { - super(FrameType.PUSH_PROMISE); - this.streamId = streamId; + super(FrameType.PUSH_PROMISE, streamId); this.promisedStreamId = promisedStreamId; this.metaData = metaData; } - public int getStreamId() - { - return streamId; - } - public int getPromisedStreamId() { return promisedStreamId; @@ -54,9 +47,15 @@ public class PushPromiseFrame extends Frame return metaData; } + @Override + public PushPromiseFrame withStreamId(int streamId) + { + return new PushPromiseFrame(getStreamId(), streamId, getMetaData()); + } + @Override public String toString() { - return String.format("%s#%d/#%d", super.toString(), streamId, promisedStreamId); + return String.format("%s#%d/#%d", super.toString(), getStreamId(), promisedStreamId); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/StreamFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/StreamFrame.java new file mode 100644 index 00000000000..20d8cf03a28 --- /dev/null +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/StreamFrame.java @@ -0,0 +1,37 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.frames; + +public abstract class StreamFrame extends Frame +{ + private final int streamId; + + public StreamFrame(FrameType type, int streamId) + { + super(type); + this.streamId = streamId; + } + + public int getStreamId() + { + return streamId; + } + + public abstract StreamFrame withStreamId(int streamId); +} diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index 10d99bf6948..4e171aa4efa 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -29,10 +29,12 @@ import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; @@ -50,64 +52,62 @@ public class HttpSenderOverHTTP2 extends HttpSender } @Override - protected void sendHeaders(HttpExchange exchange, final HttpContent content, final Callback callback) + protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback) { HttpRequest request = exchange.getRequest(); String path = relativize(request.getPath()); HttpURI uri = HttpURI.createHttpURI(request.getScheme(), request.getHost(), request.getPort(), path, null, request.getQuery(), null); MetaData.Request metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_2, request.getHeaders()); - Supplier trailerSupplier = request.getTrailers(); - metaData.setTrailerSupplier(trailerSupplier); + metaData.setTrailerSupplier(request.getTrailers()); HeadersFrame headersFrame; - Promise promise; + DataFrame dataFrame = null; + HeadersFrame trailersFrame = null; + if (content.hasContent()) { headersFrame = new HeadersFrame(metaData, null, false); - promise = new HeadersPromise(request, callback) + if (!expects100Continue(request)) { - @Override - public void succeeded(Stream stream) + boolean advanced = content.advance(); + boolean lastContent = content.isLast(); + if (advanced) { - super.succeeded(stream); - if (expects100Continue(request)) + if (lastContent) { - // Don't send the content yet. - callback.succeeded(); + HttpFields trailers = retrieveTrailers(request); + boolean hasTrailers = trailers != null; + dataFrame = new DataFrame(content.getByteBuffer(), !hasTrailers); + if (hasTrailers) + trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true); } else { - boolean advanced = content.advance(); - boolean lastContent = content.isLast(); - if (advanced || lastContent) - sendContent(stream, content, trailerSupplier, callback); - else - callback.succeeded(); + dataFrame = new DataFrame(content.getByteBuffer(), false); } } - }; + else if (lastContent) + { + HttpFields trailers = retrieveTrailers(request); + if (trailers != null) + trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true); + else + dataFrame = new DataFrame(BufferUtil.EMPTY_BUFFER, true); + } + } } else { - HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); - boolean endStream = trailers == null || trailers.size() == 0; - headersFrame = new HeadersFrame(metaData, null, endStream); - promise = new HeadersPromise(request, callback) - { - @Override - public void succeeded(Stream stream) - { - super.succeeded(stream); - if (endStream) - callback.succeeded(); - else - sendTrailers(stream, trailers, callback); - } - }; + HttpFields trailers = retrieveTrailers(request); + boolean hasTrailers = trailers != null; + headersFrame = new HeadersFrame(metaData, null, !hasTrailers); + if (hasTrailers) + trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true); } - // TODO optimize the send of HEADERS and DATA frames. + HttpChannelOverHTTP2 channel = getHttpChannel(); - channel.getSession().newStream(headersFrame, promise, channel.getStreamListener()); + IStream.FrameList frameList = new IStream.FrameList(headersFrame, dataFrame, trailersFrame); + ((ISession)channel.getSession()).newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener()); } private String relativize(String path) @@ -128,6 +128,13 @@ public class HttpSenderOverHTTP2 extends HttpSender } } + private HttpFields retrieveTrailers(HttpRequest request) + { + Supplier trailerSupplier = request.getTrailers(); + HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); + return trailers == null || trailers.size() == 0 ? null : trailers; + } + @Override protected void sendContent(HttpExchange exchange, HttpContent content, Callback callback) { @@ -140,25 +147,27 @@ public class HttpSenderOverHTTP2 extends HttpSender } else { - Stream stream = getHttpChannel().getStream(); - Supplier trailerSupplier = exchange.getRequest().getTrailers(); - sendContent(stream, content, trailerSupplier, callback); + sendContent(getHttpChannel().getStream(), exchange.getRequest(), content, callback); } } - private void sendContent(Stream stream, HttpContent content, Supplier trailerSupplier, Callback callback) + private void sendContent(Stream stream, HttpRequest request, HttpContent content, Callback callback) { - boolean lastContent = content.isLast(); - HttpFields trailers = null; - boolean endStream = false; - if (lastContent) + if (content.isLast()) { - trailers = trailerSupplier == null ? null : trailerSupplier.get(); - endStream = trailers == null || trailers.size() == 0; + HttpFields trailers = retrieveTrailers(request); + boolean hasTrailers = trailers != null; + DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), !hasTrailers); + if (hasTrailers) + stream.data(dataFrame, Callback.from(() -> sendTrailers(stream, trailers, callback), callback::failed)); + else + stream.data(dataFrame, callback); + } + else + { + DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), false); + stream.data(dataFrame, callback); } - DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), endStream); - HttpFields fTrailers = trailers; - stream.data(dataFrame, endStream || !lastContent ? callback : Callback.from(() -> sendTrailers(stream, fTrailers, callback), callback::failed)); } private void sendTrailers(Stream stream, HttpFields trailers, Callback callback) @@ -188,6 +197,7 @@ public class HttpSenderOverHTTP2 extends HttpSender long idleTimeout = request.getIdleTimeout(); if (idleTimeout >= 0) stream.setIdleTimeout(idleTimeout); + callback.succeeded(); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index 04441463067..1a5c7079b3a 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.http2.server; +import java.util.Arrays; import java.util.Collections; import java.util.Map; @@ -72,9 +73,9 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis } if (windowFrame == null) - frames(null, Callback.NOOP, settingsFrame, Frame.EMPTY_ARRAY); + frames(null, Collections.singletonList(settingsFrame), Callback.NOOP); else - frames(null, Callback.NOOP, settingsFrame, windowFrame); + frames(null, Arrays.asList(settingsFrame, windowFrame), Callback.NOOP); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index f48b2aa5978..f4a396ff31b 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -89,140 +89,154 @@ public class HttpTransportOverHTTP2 implements HttpTransport @Override public void send(MetaData.Response info, boolean isHeadRequest, ByteBuffer content, boolean lastContent, Callback callback) { - boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; if (info != null) + sendHeaders(info, content, lastContent, isHeadRequest, callback); + else + sendContent(content, lastContent, isHeadRequest, callback); + } + + private void sendHeaders(MetaData.Response info, ByteBuffer content, boolean lastContent, boolean isHeadRequest, Callback callback) + { + metaData = info; + + HeadersFrame headersFrame; + DataFrame dataFrame = null; + HeadersFrame trailersFrame = null; + + boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; + int status = info.getStatus(); + boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102; + if (interimResponse) { - metaData = info; - int status = info.getStatus(); - boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102; - if (interimResponse) + // Must not commit interim responses. + if (hasContent) { - // Must not commit interim responses. + callback.failed(new IllegalStateException("Interim response cannot have content")); + return; + } + headersFrame = new HeadersFrame(stream.getId(), info, null, false); + } + else + { + if (commit.compareAndSet(false, true)) + { + if (lastContent) + { + long realContentLength = BufferUtil.length(content); + long contentLength = info.getContentLength(); + if (contentLength < 0) + { + info.setContentLength(realContentLength); + } + else if (hasContent && contentLength != realContentLength) + { + callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength))); + return; + } + } + if (hasContent) { - callback.failed(new IllegalStateException("Interim response cannot have content")); + headersFrame = new HeadersFrame(stream.getId(), info, null, false); + if (lastContent) + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + dataFrame = new DataFrame(stream.getId(), content, true); + } + else + { + dataFrame = new DataFrame(stream.getId(), content, false); + trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true); + } + } + else + { + dataFrame = new DataFrame(stream.getId(), content, false); + } } else { - transportCallback.send(callback, false, c -> - sendHeadersFrame(info, false, c)); + if (lastContent) + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + headersFrame = new HeadersFrame(stream.getId(), info, null, true); + } + else + { + headersFrame = new HeadersFrame(stream.getId(), info, null, false); + trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true); + } + } + else + { + headersFrame = new HeadersFrame(stream.getId(), info, null, false); + } } } else { - if (commit.compareAndSet(false, true)) - { - if (lastContent) - { - long realContentLength = BufferUtil.length(content); - long contentLength = info.getContentLength(); - if (contentLength < 0) - { - info.setContentLength(realContentLength); - } - else if (hasContent && contentLength != realContentLength) - { - callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength))); - return; - } - } + callback.failed(new IllegalStateException("Response already committed")); + return; + } + } - if (hasContent) - { - Callback commitCallback = new Callback.Nested(callback) - { - @Override - public void succeeded() - { - if (lastContent) - { - HttpFields trailers = retrieveTrailers(); - if (trailers != null) - { - transportCallback.send(new SendTrailers(getCallback(), trailers), false, c -> - sendDataFrame(content, true, false, c)); - } - else - { - transportCallback.send(getCallback(), false, c -> - sendDataFrame(content, true, true, c)); - } - } - else - { - transportCallback.send(getCallback(), false, c -> - sendDataFrame(content, false, false, c)); - } - } - }; - transportCallback.send(commitCallback, true, c -> - sendHeadersFrame(info, false, c)); - } - else - { - if (lastContent) - { - HttpFields trailers = retrieveTrailers(); - if (trailers != null) - { - transportCallback.send(new SendTrailers(callback, trailers), true, c -> - sendHeadersFrame(info, false, c)); - } - else - { - transportCallback.send(callback, true, c -> - sendHeadersFrame(info, true, c)); - } - } - else - { - transportCallback.send(callback, true, c -> - sendHeadersFrame(info, false, c)); - } - } + HeadersFrame hf = headersFrame; + DataFrame df = dataFrame; + HeadersFrame tf = trailersFrame; + + transportCallback.send(callback, true, c -> + { + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(), + System.lineSeparator(), info.getFields()); + } + stream.send(new IStream.FrameList(hf, df, tf), c); + }); + } + + private void sendContent(ByteBuffer content, boolean lastContent, boolean isHeadRequest, Callback callback) + { + boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; + if (hasContent || lastContent) + { + if (lastContent) + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + transportCallback.send(callback, false, c -> + sendDataFrame(content, true, true, c)); } else { - callback.failed(new IllegalStateException("committed")); + SendTrailers sendTrailers = new SendTrailers(callback, trailers); + if (hasContent) + { + transportCallback.send(sendTrailers, false, c -> + sendDataFrame(content, true, false, c)); + } + else + { + sendTrailers.succeeded(); + } } } + else + { + transportCallback.send(callback, false, c -> + sendDataFrame(content, false, false, c)); + } } else { - if (hasContent || lastContent) - { - if (lastContent) - { - HttpFields trailers = retrieveTrailers(); - if (trailers != null) - { - SendTrailers sendTrailers = new SendTrailers(callback, trailers); - if (hasContent) - { - transportCallback.send(sendTrailers, false, c -> - sendDataFrame(content, true, false, c)); - } - else - { - sendTrailers.succeeded(); - } - } - else - { - transportCallback.send(callback, false, c -> - sendDataFrame(content, true, true, c)); - } - } - else - { - transportCallback.send(callback, false, c -> - sendDataFrame(content, false, false, c)); - } - } - else - { - callback.succeeded(); - } + callback.succeeded(); } } @@ -273,20 +287,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport }, new Stream.Listener.Adapter()); // TODO: handle reset from the client ? } - private void sendHeadersFrame(MetaData.Response info, boolean endStream, Callback callback) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}", - stream.getId(), Integer.toHexString(stream.getSession().hashCode()), - System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(), - System.lineSeparator(), info.getFields()); - } - - HeadersFrame frame = new HeadersFrame(stream.getId(), info, null, endStream); - stream.headers(frame, callback); - } - private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback) { if (LOG.isDebugEnabled())