diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index 34e434c3827..83add784126 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -31,7 +31,6 @@ import java.nio.channels.SocketChannel; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -563,7 +562,8 @@ public class HttpClient extends ContainerLifeCycle @Override public void succeeded(List socketAddresses) { - Map context = new HashMap<>(); + // Multiple threads may access the map, especially with DEBUG logging enabled. + Map context = new ConcurrentHashMap<>(); context.put(ClientConnectionFactory.CLIENT_CONTEXT_KEY, HttpClient.this); context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination); connect(socketAddresses, 0, context); diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java index 2e26a34f2c4..59951fb127c 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java @@ -21,10 +21,10 @@ package org.eclipse.jetty.http2.client; import java.net.SocketAddress; import java.nio.channels.SocketChannel; import java.time.Duration; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory; @@ -391,7 +391,7 @@ public class HTTP2Client extends ContainerLifeCycle private Map contextFrom(ClientConnectionFactory factory, Session.Listener listener, Promise promise, Map context) { if (context == null) - context = new HashMap<>(); + context = new ConcurrentHashMap<>(); context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this); context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener); context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise); diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 49f252e3f69..dec64e09cd0 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.http2.client; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -129,11 +130,11 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory if (windowDelta > 0) { session.updateRecvWindow(windowDelta); - session.frames(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)); + session.frames(null, List.of(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this); } else { - session.frames(null, this, prefaceFrame, settingsFrame); + session.frames(null, List.of(prefaceFrame, settingsFrame), this); } } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index 5931417a2ed..ef2fa0e290f 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -63,6 +63,7 @@ import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PrefaceFrame; import org.eclipse.jetty.http2.frames.ResetFrame; @@ -198,14 +199,15 @@ public class StreamResetTest extends AbstractTest { // Simulate that there is pending data to send. IStream stream = (IStream)s; - stream.getSession().frames(stream, new Callback() + List frames = List.of(new DataFrame(s.getId(), ByteBuffer.allocate(16), true)); + stream.getSession().frames(stream, frames, new Callback() { @Override public void failed(Throwable x) { serverResetLatch.countDown(); } - }, new DataFrame(s.getId(), ByteBuffer.allocate(16), true)); + }); } }; } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java index bebbd440682..1c722af1c43 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java @@ -18,11 +18,11 @@ package org.eclipse.jetty.http2; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.util.Atomics; import org.eclipse.jetty.util.Callback; @@ -171,7 +171,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame) { - session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY); + session.frames(stream, List.of(frame), Callback.NOOP); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index cc11d2fb508..0c78b69e74e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -115,6 +115,25 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable return false; } + public boolean append(List list) + { + Throwable closed; + synchronized (this) + { + closed = terminated; + if (closed == null) + { + list.forEach(entries::offer); + if (LOG.isDebugEnabled()) + LOG.debug("Appended {}, entries={}", list, entries.size()); + } + } + if (closed == null) + return true; + list.forEach(entry -> closed(entry, closed)); + return false; + } + private int getWindowQueueSize() { try (AutoLock l = lock.lock()) @@ -416,6 +435,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable public abstract long onFlushed(long bytes) throws IOException; + boolean hasHighPriority() + { + return false; + } + @Override public void failed(Throwable x) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 4c065da0e04..e95e34e024d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -26,6 +26,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; @@ -36,6 +37,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.api.Session; @@ -53,6 +55,7 @@ import org.eclipse.jetty.http2.frames.PriorityFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.frames.StreamFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.hpack.HpackException; @@ -602,7 +605,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) { - streamCreator.newStream(frame, promise, listener); + newStream(new IStream.FrameList(frame), promise, listener); + } + + @Override + public void newStream(IStream.FrameList frames, Promise promise, Stream.Listener listener) + { + streamCreator.newStream(frames, promise, listener); } /** @@ -617,11 +626,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio */ public IStream newLocalStream(HeadersFrame frameIn, HeadersFrame[] frameOut) { - int streamId = frameIn.getStreamId(); + HeadersFrame frame = frameIn; + int streamId = frame.getStreamId(); if (streamId <= 0) + { streamId = localStreamIds.getAndAdd(2); + frame = frame.withStreamId(streamId); + } - HeadersFrame frame = streamCreator.prepareHeadersFrame(streamId, frameIn); if (frameOut != null) frameOut[0] = frame; @@ -749,46 +761,48 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private void control(IStream stream, Callback callback, Frame frame) { - frames(stream, callback, frame, Frame.EMPTY_ARRAY); + frames(stream, List.of(frame), callback); } @Override - public void frames(IStream stream, Callback callback, Frame frame, Frame... frames) + public void frames(IStream stream, List frames, Callback callback) { // We want to generate as late as possible to allow re-prioritization; // generation will happen while processing the entries. // The callback needs to be notified only when the last frame completes. - int length = frames.length; - if (length == 0) + int count = frames.size(); + if (count > 1) + callback = new CountingCallback(callback, count); + for (int i = 1; i <= count; ++i) { - frame(new ControlEntry(frame, stream, callback), true); - } - else - { - callback = new CountingCallback(callback, 1 + length); - frame(new ControlEntry(frame, stream, callback), false); - for (int i = 1; i <= length; ++i) - { - frame(new ControlEntry(frames[i - 1], stream, callback), i == length); - } + Frame frame = frames.get(i - 1); + HTTP2Flusher.Entry entry = newEntry(frame, stream, callback); + frame(entry, i == count); } } + private HTTP2Flusher.Entry newEntry(Frame frame, IStream stream, Callback callback) + { + return frame.getType() == FrameType.DATA + ? new DataEntry((DataFrame)frame, stream, callback) + : new ControlEntry(frame, stream, callback); + } + @Override public void data(IStream stream, Callback callback, DataFrame frame) { // We want to generate as late as possible to allow re-prioritization. - frame(new DataEntry(frame, stream, callback), true); + frame(newEntry(frame, stream, callback), true); } private void frame(HTTP2Flusher.Entry entry, boolean flush) { if (LOG.isDebugEnabled()) - LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this); + LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry, this); // Ping frames are prepended to process them as soon as possible. - boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry); + boolean queued = entry.hasHighPriority() ? flusher.prepend(entry) : flusher.append(entry); if (queued && flush) { if (entry.stream != null) @@ -1356,6 +1370,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } + @Override + boolean hasHighPriority() + { + return frame.getType() == FrameType.PING; + } + @Override public void succeeded() { @@ -1700,7 +1720,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private void complete() { - frames(null, Callback.NOOP, newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()); + frames(null, List.of(newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()), Callback.NOOP); } } @@ -1760,26 +1780,40 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio int streamId = reserveSlot(slot, currentStreamId); if (currentStreamId <= 0) - frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive()); + frame = frame.withStreamId(streamId); - assignSlotAndFlush(slot, new ControlEntry(frame, null, callback)); + assignSlotAndFlush(slot, newEntry(frame, null, callback)); return streamId; } - private void newStream(HeadersFrame frame, Promise promise, Stream.Listener listener) + private void newStream(IStream.FrameList frameList, Promise promise, Stream.Listener listener) { Slot slot = new Slot(); - int currentStreamId = frame.getStreamId(); + int currentStreamId = frameList.getStreamId(); int streamId = reserveSlot(slot, currentStreamId); - frame = prepareHeadersFrame(streamId, frame); + List frames = frameList.getFrames(); + if (currentStreamId <= 0) + { + frames = frames.stream() + .map(frame -> frame.withStreamId(streamId)) + .collect(Collectors.toList()); + } try { - IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData()); + HeadersFrame headersFrame = (HeadersFrame)frameList.getFrames().get(0); + IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)headersFrame.getMetaData()); stream.setListener(listener); stream.process(new PrefaceFrame(), Callback.NOOP); - assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream))); + + int count = frames.size(); + Callback streamCallback = new StreamPromiseCallback(promise, stream); + Callback callback = count == 1 ? streamCallback : new CountingCallback(streamCallback, count); + List entries = frames.stream() + .map(frame -> newEntry(frame, stream, callback)) + .collect(Collectors.toList()); + assignSlotAndFlush(slot, entries); } catch (Throwable x) { @@ -1787,28 +1821,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - private HeadersFrame prepareHeadersFrame(int streamId, HeadersFrame frame) - { - if (frame.getStreamId() <= 0) - { - PriorityFrame priority = frame.getPriority(); - priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive()); - frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); - } - return frame; - } - private void push(PushPromiseFrame frame, Promise promise, Stream.Listener listener) { Slot slot = new Slot(); int streamId = reserveSlot(slot, 0); - frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); + frame = frame.withStreamId(streamId); try { IStream stream = HTTP2Session.this.createLocalStream(streamId, frame.getMetaData()); stream.setListener(listener); - assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream))); + assignSlotAndFlush(slot, newEntry(frame, stream, new StreamPromiseCallback(promise, stream))); } catch (Throwable x) { @@ -1816,13 +1839,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - private void assignSlotAndFlush(Slot slot, ControlEntry entry) - { - // Every time a slot entry is assigned, we must flush. - slot.entry = entry; - flush(); - } - private int reserveSlot(Slot slot, int streamId) { if (streamId <= 0) @@ -1843,6 +1859,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio return streamId; } + private void assignSlotAndFlush(Slot slot, HTTP2Flusher.Entry entry) + { + assignSlotAndFlush(slot, List.of(entry)); + } + + private void assignSlotAndFlush(Slot slot, List entries) + { + // Every time a slot entry is assigned, we must flush. + slot.entries = entries; + flush(); + } + private void releaseSlotFlushAndFail(Slot slot, Promise promise, Throwable x) { try (AutoLock l = lock.lock()) @@ -1854,16 +1882,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } /** - * Flush goes over the entries of the slots queue to flush the entries, - * until either one of the following two conditions is true: - * - The queue is empty. - * - It reaches a slot with a null entry. - * When a slot with a null entry is encountered, this means a concurrent thread reserved a slot - * but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering - * the null entry must bail out and it is up to the concurrent thread to finish up flushing. - * Note that only one thread can flush at any one time, if two threads happen to call flush - * concurrently, one will do the work while the other will bail out, so it is safe that all - * threads call flush after they're done reserving a slot and setting the entry. + *

Iterates over the entries of the slot queue to flush them.

+ *

The flush proceeds until either one of the following two conditions is true:

+ *
    + *
  • the queue is empty
  • + *
  • a slot with a no entries is encountered
  • + *
+ *

When a slot with a no entries is encountered, then it means that a concurrent thread reserved + * a slot but hasn't set its entries yet. Since slots must be flushed in order, the thread encountering + * the slot with no entries must bail out and it is up to the concurrent thread to finish up flushing.

+ *

Note that only one thread can flush at any time; if two threads happen to call this method + * concurrently, one will do the work while the other will bail out, so it is safe that all + * threads call this method after they are done reserving a slot and setting the entries.

*/ private void flush() { @@ -1871,7 +1901,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio boolean queued = false; while (true) { - ControlEntry entry; + List entries; try (AutoLock l = lock.lock()) { if (flushing == null) @@ -1880,18 +1910,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio return; // Another thread is flushing. Slot slot = slots.peek(); - entry = slot == null ? null : slot.entry; + entries = slot == null ? null : slot.entries; - if (entry == null) + if (entries == null) { flushing = null; - // No more slots or null entry, so we may iterate on the flusher. + // No more slots or null entries, so we may iterate on the flusher. break; } slots.poll(); } - queued |= flusher.append(entry); + queued |= flusher.append(entries); } if (queued) flusher.iterate(); @@ -1899,7 +1929,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private class Slot { - private volatile ControlEntry entry; + private volatile List entries; } } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index b25b3cfdc4b..973c957d4c5 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -22,6 +22,7 @@ import java.io.EOFException; import java.io.IOException; import java.nio.channels.WritePendingException; import java.util.ArrayDeque; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -122,9 +123,15 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa @Override public void headers(HeadersFrame frame, Callback callback) + { + send(new FrameList(frame), callback); + } + + @Override + public void send(FrameList frameList, Callback callback) { if (startWrite(callback)) - session.frames(this, this, frame, Frame.EMPTY_ARRAY); + session.frames(this, frameList.getFrames(), this); } @Override @@ -150,7 +157,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa localReset = true; failure = new EOFException("reset"); } - session.frames(this, callback, frame, Frame.EMPTY_ARRAY); + session.frames(this, List.of(frame), callback); } private boolean startWrite(Callback callback) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java index 4ad49b481f8..1fcc716021c 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.http2; import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http2.api.Session; @@ -48,18 +49,25 @@ public interface ISession extends Session public void removeStream(IStream stream); /** - *

Enqueues the given frames to be written to the connection.

+ *

Sends the given list of frames to create a new {@link Stream}.

* - * @param stream the stream the frames belong to - * @param callback the callback that gets notified when the frames have been sent - * @param frame the first frame to enqueue - * @param frames additional frames to enqueue + * @param frames the list of frames to send + * @param promise the promise that gets notified of the stream creation + * @param listener the listener that gets notified of stream events */ - public void frames(IStream stream, Callback callback, Frame frame, Frame... frames); + public void newStream(IStream.FrameList frames, Promise promise, Stream.Listener listener); + + /** + *

Enqueues the given frames to be written to the connection.

+ * @param stream the stream the frames belong to + * @param frames the frames to enqueue + * @param callback the callback that gets notified when the frames have been sent + */ + public void frames(IStream stream, List frames, Callback callback); /** *

Enqueues the given PUSH_PROMISE frame to be written to the connection.

- *

Differently from {@link #frames(IStream, Callback, Frame, Frame...)}, this method + *

Differently from {@link #frames(IStream, List, Callback)}, this method * generates atomically the stream id for the pushed stream.

* * @param stream the stream associated to the pushed stream diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 3747150c354..81a38ae9f7b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -19,9 +19,16 @@ package org.eclipse.jetty.http2; import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.StreamFrame; import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Callback; @@ -52,6 +59,15 @@ public interface IStream extends Stream, Attachable, Closeable */ public void setListener(Listener listener); + /** + *

Sends the given list of frames.

+ *

Typically used to send HTTP headers along with content and possibly trailers.

+ * + * @param frameList the list of frames to send + * @param callback the callback that gets notified when the frames have been sent + */ + void send(FrameList frameList, Callback callback); + /** *

Processes the given {@code frame}, belonging to this stream.

* @@ -109,4 +125,63 @@ public interface IStream extends Stream, Attachable, Closeable * @see Listener#onFailure(Stream, int, String, Throwable, Callback) */ boolean isResetOrFailed(); + + /** + *

An ordered list of frames belonging to the same stream.

+ */ + public static class FrameList + { + private final List frames; + + /** + *

Creates a frame list of just the given HEADERS frame.

+ * + * @param headers the HEADERS frame + */ + public FrameList(HeadersFrame headers) + { + Objects.requireNonNull(headers); + this.frames = List.of(headers); + } + + /** + *

Creates a frame list of the given frames.

+ * + * @param headers the HEADERS frame for the headers + * @param data the DATA frame for the content, or null if there is no content + * @param trailers the HEADERS frame for the trailers, or null if there are no trailers + */ + public FrameList(HeadersFrame headers, DataFrame data, HeadersFrame trailers) + { + Objects.requireNonNull(headers); + ArrayList frames = new ArrayList<>(3); + int streamId = headers.getStreamId(); + if (data != null && data.getStreamId() != streamId) + throw new IllegalArgumentException("Invalid stream ID for DATA frame " + data); + if (trailers != null && trailers.getStreamId() != streamId) + throw new IllegalArgumentException("Invalid stream ID for HEADERS frame " + trailers); + frames.add(headers); + if (data != null) + frames.add(data); + if (trailers != null) + frames.add(trailers); + this.frames = Collections.unmodifiableList(frames); + } + + /** + * @return the stream ID of the frames in this list + */ + public int getStreamId() + { + return frames.get(0).getStreamId(); + } + + /** + * @return a List of non-null frames + */ + public List getFrames() + { + return frames; + } + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java index d597d7e5ad3..c5dc1967265 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java @@ -18,6 +18,9 @@ package org.eclipse.jetty.http2; +import java.util.ArrayList; +import java.util.List; + import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.util.Callback; @@ -44,12 +47,13 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy // This method is called when a whole flow controlled frame has been consumed. // We send a WindowUpdate every time, even if the frame was very small. - final WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length); + List frames = new ArrayList<>(2); + WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length); + frames.add(sessionFrame); session.updateRecvWindow(length); if (LOG.isDebugEnabled()) LOG.debug("Data consumed, increased session recv window by {} for {}", length, session); - Frame[] streamFrame = Frame.EMPTY_ARRAY; if (stream != null) { if (stream.isRemotelyClosed()) @@ -59,14 +63,14 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy } else { - streamFrame = new Frame[1]; - streamFrame[0] = new WindowUpdateFrame(stream.getId(), length); + WindowUpdateFrame streamFrame = new WindowUpdateFrame(stream.getId(), length); + frames.add(streamFrame); stream.updateRecvWindow(length); if (LOG.isDebugEnabled()) LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream); } } - session.frames(stream, Callback.NOOP, sessionFrame, streamFrame); + session.frames(stream, frames, Callback.NOOP); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index 73fee4e7b32..bf41f0fe2d2 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -66,7 +66,8 @@ public interface Stream } /** - *

Sends the given HEADERS {@code frame} representing an HTTP response.

+ *

Sends the given HEADERS {@code frame}.

+ *

Typically used to send an HTTP response or to send the HTTP response trailers.

* * @param frame the HEADERS frame to send * @param callback the callback that gets notified when the frame has been sent diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java index c44b3bc74be..4e404b63387 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/DataFrame.java @@ -20,13 +20,17 @@ package org.eclipse.jetty.http2.frames; import java.nio.ByteBuffer; -public class DataFrame extends Frame +public class DataFrame extends StreamFrame { - private final int streamId; private final ByteBuffer data; private final boolean endStream; private final int padding; + public DataFrame(ByteBuffer data, boolean endStream) + { + this(0, data, endStream); + } + public DataFrame(int streamId, ByteBuffer data, boolean endStream) { this(streamId, data, endStream, 0); @@ -34,18 +38,12 @@ public class DataFrame extends Frame public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding) { - super(FrameType.DATA); - this.streamId = streamId; + super(FrameType.DATA, streamId); this.data = data; this.endStream = endStream; this.padding = padding; } - public int getStreamId() - { - return streamId; - } - public ByteBuffer getData() { return data; @@ -72,9 +70,15 @@ public class DataFrame extends Frame return padding; } + @Override + public DataFrame withStreamId(int streamId) + { + return new DataFrame(streamId, getData(), isEndStream()); + } + @Override public String toString() { - return String.format("%s#%d{length:%d,end=%b}", super.toString(), streamId, data.remaining(), endStream); + return String.format("%s#%d{length:%d,end=%b}", super.toString(), getStreamId(), data.remaining(), endStream); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java index c72b4933628..ceddbc6aed1 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java @@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames; import org.eclipse.jetty.http.MetaData; -public class HeadersFrame extends Frame +public class HeadersFrame extends StreamFrame { - private final int streamId; private final MetaData metaData; private final PriorityFrame priority; private final boolean endStream; @@ -53,18 +52,12 @@ public class HeadersFrame extends Frame */ public HeadersFrame(int streamId, MetaData metaData, PriorityFrame priority, boolean endStream) { - super(FrameType.HEADERS); - this.streamId = streamId; + super(FrameType.HEADERS, streamId); this.metaData = metaData; this.priority = priority; this.endStream = endStream; } - public int getStreamId() - { - return streamId; - } - public MetaData getMetaData() { return metaData; @@ -80,10 +73,18 @@ public class HeadersFrame extends Frame return endStream; } + @Override + public HeadersFrame withStreamId(int streamId) + { + PriorityFrame priority = getPriority(); + priority = priority == null ? null : priority.withStreamId(streamId); + return new HeadersFrame(streamId, getMetaData(), priority, isEndStream()); + } + @Override public String toString() { - return String.format("%s#%d{end=%b}%s", super.toString(), streamId, endStream, + return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream, priority == null ? "" : String.format("+%s", priority)); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java index a7fad1a5129..46dbc8610b2 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java @@ -18,11 +18,10 @@ package org.eclipse.jetty.http2.frames; -public class PriorityFrame extends Frame +public class PriorityFrame extends StreamFrame { public static final int PRIORITY_LENGTH = 5; - private final int streamId; private final int parentStreamId; private final int weight; private final boolean exclusive; @@ -34,18 +33,12 @@ public class PriorityFrame extends Frame public PriorityFrame(int streamId, int parentStreamId, int weight, boolean exclusive) { - super(FrameType.PRIORITY); - this.streamId = streamId; + super(FrameType.PRIORITY, streamId); this.parentStreamId = parentStreamId; this.weight = weight; this.exclusive = exclusive; } - public int getStreamId() - { - return streamId; - } - public int getParentStreamId() { return parentStreamId; @@ -61,9 +54,15 @@ public class PriorityFrame extends Frame return exclusive; } + @Override + public PriorityFrame withStreamId(int streamId) + { + return new PriorityFrame(streamId, getParentStreamId(), getWeight(), isExclusive()); + } + @Override public String toString() { - return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), streamId, parentStreamId, weight, exclusive); + return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), getStreamId(), parentStreamId, weight, exclusive); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java index ee19dffeb89..ff33e939ef8 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PushPromiseFrame.java @@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames; import org.eclipse.jetty.http.MetaData; -public class PushPromiseFrame extends Frame +public class PushPromiseFrame extends StreamFrame { - private final int streamId; private final int promisedStreamId; private final MetaData.Request metaData; @@ -33,17 +32,11 @@ public class PushPromiseFrame extends Frame public PushPromiseFrame(int streamId, int promisedStreamId, MetaData.Request metaData) { - super(FrameType.PUSH_PROMISE); - this.streamId = streamId; + super(FrameType.PUSH_PROMISE, streamId); this.promisedStreamId = promisedStreamId; this.metaData = metaData; } - public int getStreamId() - { - return streamId; - } - public int getPromisedStreamId() { return promisedStreamId; @@ -54,9 +47,15 @@ public class PushPromiseFrame extends Frame return metaData; } + @Override + public PushPromiseFrame withStreamId(int streamId) + { + return new PushPromiseFrame(getStreamId(), streamId, getMetaData()); + } + @Override public String toString() { - return String.format("%s#%d/#%d", super.toString(), streamId, promisedStreamId); + return String.format("%s#%d/#%d", super.toString(), getStreamId(), promisedStreamId); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/StreamFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/StreamFrame.java new file mode 100644 index 00000000000..7e6665c1c30 --- /dev/null +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/StreamFrame.java @@ -0,0 +1,37 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.frames; + +public abstract class StreamFrame extends Frame +{ + private final int streamId; + + public StreamFrame(FrameType type, int streamId) + { + super(type); + this.streamId = streamId; + } + + public int getStreamId() + { + return streamId; + } + + public abstract StreamFrame withStreamId(int streamId); +} diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index b45787920ac..4dcc08d0f83 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.http2.client.http; import java.net.URI; import java.nio.ByteBuffer; -import java.util.function.Consumer; import java.util.function.Supplier; import org.eclipse.jetty.client.HttpExchange; @@ -32,6 +31,8 @@ import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ISession; +import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; @@ -57,7 +58,7 @@ public class HttpSenderOverHTTP2 extends HttpSender } @Override - protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, final Callback callback) + protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback) { HttpRequest request = exchange.getRequest(); boolean isTunnel = HttpMethod.CONNECT.is(request.getMethod()); @@ -88,38 +89,52 @@ public class HttpSenderOverHTTP2 extends HttpSender } HeadersFrame headersFrame; - Promise promise; + DataFrame dataFrame = null; + HeadersFrame trailersFrame = null; + if (isTunnel) { headersFrame = new HeadersFrame(metaData, null, false); - promise = new HeadersPromise(request, callback, stream -> callback.succeeded()); } else { - Supplier trailerSupplier = request.getTrailers(); - if (BufferUtil.isEmpty(contentBuffer) && lastContent) + boolean hasContent = BufferUtil.hasContent(contentBuffer); + if (hasContent) { - HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); - boolean endStream = trailers == null || trailers.size() == 0; - headersFrame = new HeadersFrame(metaData, null, endStream); - promise = new HeadersPromise(request, callback, stream -> + headersFrame = new HeadersFrame(metaData, null, false); + if (lastContent) { - if (endStream) - callback.succeeded(); - else - sendTrailers(stream, trailers, callback); - }); + HttpFields trailers = retrieveTrailers(request); + boolean hasTrailers = trailers != null; + dataFrame = new DataFrame(contentBuffer, !hasTrailers); + if (hasTrailers) + trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true); + } + else + { + dataFrame = new DataFrame(contentBuffer, false); + } } else { - headersFrame = new HeadersFrame(metaData, null, false); - promise = new HeadersPromise(request, callback, stream -> - sendContent(stream, contentBuffer, lastContent, trailerSupplier, callback)); + if (lastContent) + { + HttpFields trailers = retrieveTrailers(request); + boolean hasTrailers = trailers != null; + headersFrame = new HeadersFrame(metaData, null, !hasTrailers); + if (hasTrailers) + trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true); + } + else + { + headersFrame = new HeadersFrame(metaData, null, false); + } } } - // TODO optimize the send of HEADERS and DATA frames. + HttpChannelOverHTTP2 channel = getHttpChannel(); - channel.getSession().newStream(headersFrame, promise, channel.getStreamListener()); + IStream.FrameList frameList = new IStream.FrameList(headersFrame, dataFrame, trailersFrame); + ((ISession)channel.getSession()).newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener()); } private String relativize(String path) @@ -140,29 +155,30 @@ public class HttpSenderOverHTTP2 extends HttpSender } } + private HttpFields retrieveTrailers(HttpRequest request) + { + Supplier trailerSupplier = request.getTrailers(); + HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); + return trailers == null || trailers.size() == 0 ? null : trailers; + } + @Override protected void sendContent(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback) { Stream stream = getHttpChannel().getStream(); - Supplier trailerSupplier = exchange.getRequest().getTrailers(); - sendContent(stream, contentBuffer, lastContent, trailerSupplier, callback); - } - - private void sendContent(Stream stream, ByteBuffer buffer, boolean lastContent, Supplier trailerSupplier, Callback callback) - { - boolean hasContent = buffer.hasRemaining(); + boolean hasContent = contentBuffer.hasRemaining(); if (lastContent) { // Call the trailers supplier as late as possible. - HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); + HttpFields trailers = retrieveTrailers(exchange.getRequest()); boolean hasTrailers = trailers != null && trailers.size() > 0; if (hasContent) { - DataFrame dataFrame = new DataFrame(stream.getId(), buffer, !hasTrailers); - Callback dataCallback = callback; + DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, !hasTrailers); if (hasTrailers) - dataCallback = Callback.from(() -> sendTrailers(stream, trailers, callback), callback::failed); - stream.data(dataFrame, dataCallback); + stream.data(dataFrame, Callback.from(() -> sendTrailers(stream, trailers, callback), callback::failed)); + else + stream.data(dataFrame, callback); } else { @@ -172,7 +188,7 @@ public class HttpSenderOverHTTP2 extends HttpSender } else { - DataFrame dataFrame = new DataFrame(stream.getId(), buffer, true); + DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, true); stream.data(dataFrame, callback); } } @@ -181,7 +197,7 @@ public class HttpSenderOverHTTP2 extends HttpSender { if (hasContent) { - DataFrame dataFrame = new DataFrame(stream.getId(), buffer, false); + DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, false); stream.data(dataFrame, callback); } else @@ -203,13 +219,11 @@ public class HttpSenderOverHTTP2 extends HttpSender { private final HttpRequest request; private final Callback callback; - private final Consumer succeed; - private HeadersPromise(HttpRequest request, Callback callback, Consumer succeed) + private HeadersPromise(HttpRequest request, Callback callback) { this.request = request; this.callback = callback; - this.succeed = succeed; } @Override @@ -218,7 +232,7 @@ public class HttpSenderOverHTTP2 extends HttpSender long idleTimeout = request.getIdleTimeout(); if (idleTimeout >= 0) stream.setIdleTimeout(idleTimeout); - succeed.accept(stream); + callback.succeeded(); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index 2385500dc71..4e23fd014f8 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.http2.server; import java.util.Collections; +import java.util.List; import java.util.Map; import org.eclipse.jetty.http.MetaData; @@ -73,9 +74,9 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis } if (windowFrame == null) - frames(null, Callback.NOOP, settingsFrame, Frame.EMPTY_ARRAY); + frames(null, List.of(settingsFrame), Callback.NOOP); else - frames(null, Callback.NOOP, settingsFrame, windowFrame); + frames(null, List.of(settingsFrame, windowFrame), Callback.NOOP); } @Override diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 66ab155b4db..893cf5f885a 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -84,158 +84,172 @@ public class HttpTransportOverHTTP2 implements HttpTransport } @Override - public void send(MetaData.Request request, final MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback) + public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback) { + if (response != null) + sendHeaders(request, response, content, lastContent, callback); + else + sendContent(request, content, lastContent, callback); + } + + public void sendHeaders(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback) + { + metaData = response; + + HeadersFrame headersFrame; + DataFrame dataFrame = null; + HeadersFrame trailersFrame = null; + boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod()); boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; - if (response != null) + int status = response.getStatus(); + boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102; + if (interimResponse) { - metaData = response; - int status = response.getStatus(); - boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102; - if (interimResponse) + // Must not commit interim responses. + if (hasContent) { - // Must not commit interim responses. + callback.failed(new IllegalStateException("Interim response cannot have content")); + return; + } + headersFrame = new HeadersFrame(stream.getId(), metaData, null, false); + } + else + { + if (commit.compareAndSet(false, true)) + { + if (lastContent) + { + long realContentLength = BufferUtil.length(content); + long contentLength = response.getContentLength(); + if (contentLength < 0) + { + metaData = new MetaData.Response( + response.getHttpVersion(), + response.getStatus(), + response.getReason(), + response.getFields(), + realContentLength, + response.getTrailerSupplier() + ); + } + else if (hasContent && contentLength != realContentLength) + { + callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength))); + return; + } + } + if (hasContent) { - callback.failed(new IllegalStateException("Interim response cannot have content")); + headersFrame = new HeadersFrame(stream.getId(), metaData, null, false); + if (lastContent) + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + dataFrame = new DataFrame(stream.getId(), content, true); + } + else + { + dataFrame = new DataFrame(stream.getId(), content, false); + trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true); + } + } + else + { + dataFrame = new DataFrame(stream.getId(), content, false); + } } else { - transportCallback.send(callback, false, c -> - sendHeadersFrame(metaData, false, c)); + if (lastContent) + { + if (isTunnel(request, metaData)) + { + headersFrame = new HeadersFrame(stream.getId(), metaData, null, false); + } + else + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + headersFrame = new HeadersFrame(stream.getId(), metaData, null, true); + } + else + { + headersFrame = new HeadersFrame(stream.getId(), metaData, null, false); + trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true); + } + } + } + else + { + headersFrame = new HeadersFrame(stream.getId(), metaData, null, false); + } } } else { - if (commit.compareAndSet(false, true)) - { - if (lastContent) - { - long realContentLength = BufferUtil.length(content); - long contentLength = response.getContentLength(); - if (contentLength < 0) - { - metaData = new MetaData.Response( - response.getHttpVersion(), - response.getStatus(), - response.getReason(), - response.getFields(), - realContentLength, - response.getTrailerSupplier() - ); - } - else if (hasContent && contentLength != realContentLength) - { - callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength))); - return; - } - } + callback.failed(new IllegalStateException("committed")); + return; + } + } - if (hasContent) - { - Callback commitCallback = new Callback.Nested(callback) - { - @Override - public void succeeded() - { - if (lastContent) - { - HttpFields trailers = retrieveTrailers(); - if (trailers != null) - { - transportCallback.send(new SendTrailers(getCallback(), trailers), false, c -> - sendDataFrame(content, true, false, c)); - } - else - { - transportCallback.send(getCallback(), false, c -> - sendDataFrame(content, true, true, c)); - } - } - else - { - transportCallback.send(getCallback(), false, c -> - sendDataFrame(content, false, false, c)); - } - } - }; - transportCallback.send(commitCallback, true, c -> - sendHeadersFrame(metaData, false, c)); - } - else - { - if (lastContent) - { - if (isTunnel(request, metaData)) - { - transportCallback.send(callback, true, c -> - sendHeadersFrame(metaData, false, c)); - } - else - { - HttpFields trailers = retrieveTrailers(); - if (trailers != null) - { - transportCallback.send(new SendTrailers(callback, trailers), true, c -> - sendHeadersFrame(metaData, false, c)); - } - else - { - transportCallback.send(callback, true, c -> - sendHeadersFrame(metaData, true, c)); - } - } - } - else - { - transportCallback.send(callback, true, c -> - sendHeadersFrame(metaData, false, c)); - } - } + HeadersFrame hf = headersFrame; + DataFrame df = dataFrame; + HeadersFrame tf = trailersFrame; + + transportCallback.send(callback, true, c -> + { + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + System.lineSeparator(), HttpVersion.HTTP_2, metaData.getStatus(), + System.lineSeparator(), metaData.getFields()); + } + stream.send(new IStream.FrameList(hf, df, tf), c); + }); + } + + public void sendContent(MetaData.Request request, ByteBuffer content, boolean lastContent, Callback callback) + { + boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod()); + boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; + if (hasContent || (lastContent && !isTunnel(request, metaData))) + { + if (lastContent) + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + transportCallback.send(callback, false, c -> + sendDataFrame(content, true, true, c)); } else { - callback.failed(new IllegalStateException("committed")); + SendTrailers sendTrailers = new SendTrailers(callback, trailers); + if (hasContent) + { + transportCallback.send(sendTrailers, false, c -> + sendDataFrame(content, true, false, c)); + } + else + { + sendTrailers.succeeded(); + } } } + else + { + transportCallback.send(callback, false, c -> + sendDataFrame(content, false, false, c)); + } } else { - if (hasContent || (lastContent && !isTunnel(request, metaData))) - { - if (lastContent) - { - HttpFields trailers = retrieveTrailers(); - if (trailers != null) - { - SendTrailers sendTrailers = new SendTrailers(callback, trailers); - if (hasContent) - { - transportCallback.send(sendTrailers, false, c -> - sendDataFrame(content, true, false, c)); - } - else - { - sendTrailers.succeeded(); - } - } - else - { - transportCallback.send(callback, false, c -> - sendDataFrame(content, true, true, c)); - } - } - else - { - transportCallback.send(callback, false, c -> - sendDataFrame(content, false, false, c)); - } - } - else - { - callback.succeeded(); - } + callback.succeeded(); } } @@ -291,20 +305,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport }, new Stream.Listener.Adapter()); // TODO: handle reset from the client ? } - private void sendHeadersFrame(MetaData.Response info, boolean endStream, Callback callback) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}", - stream.getId(), Integer.toHexString(stream.getSession().hashCode()), - System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(), - System.lineSeparator(), info.getFields()); - } - - HeadersFrame frame = new HeadersFrame(stream.getId(), info, null, endStream); - stream.headers(frame, callback); - } - private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback) { if (LOG.isDebugEnabled())