Fixes #3766 - Introduce HTTP/2 API to batch frames. (#5222)

* Fixes #3766 - Introduce HTTP/2 API to batch frames.

Introduced Stream.FrameList to hold HEADERS+DATA+HEADERS frames.
These are often used by the client and by the server when the
request/response content is known and FrameList will allow to
send them in a single TCP write, rather than multiple ones.

Rewritten HttpSenderOverHTTP2.sendHeaders() and
HttpTransportOverHTTP2.sendHeaders() to take advantage of
FrameList.

Now using ConcurrentHashMap as a client context, because
with DEBUG logging enabled it may be access concurrently.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-09-10 10:13:14 +02:00 committed by GitHub
parent 1256261cda
commit f81bf7f945
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 491 additions and 290 deletions

View File

@ -30,7 +30,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -603,7 +602,8 @@ public class HttpClient extends ContainerLifeCycle
@Override
public void succeeded(List<InetSocketAddress> socketAddresses)
{
Map<String, Object> context = new HashMap<>();
// Multiple threads may access the map, especially with DEBUG logging enabled.
Map<String, Object> context = new ConcurrentHashMap<>();
context.put(ClientConnectionFactory.CONNECTOR_CONTEXT_KEY, HttpClient.this);
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
connect(socketAddresses, 0, context);

View File

@ -25,9 +25,9 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
@ -433,7 +433,7 @@ public class HTTP2Client extends ContainerLifeCycle
private Map<String, Object> contextFrom(SslContextFactory sslContextFactory, InetSocketAddress address, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{
if (context == null)
context = new HashMap<>();
context = new ConcurrentHashMap<>();
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http2.client;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
@ -127,11 +128,11 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
if (windowDelta > 0)
{
session.updateRecvWindow(windowDelta);
session.frames(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta));
session.frames(null, Arrays.asList(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this);
}
else
{
session.frames(null, this, prefaceFrame, settingsFrame);
session.frames(null, Arrays.asList(prefaceFrame, settingsFrame), this);
}
}

View File

@ -27,6 +27,7 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
@ -63,6 +64,7 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
@ -198,14 +200,15 @@ public class StreamResetTest extends AbstractTest
{
// Simulate that there is pending data to send.
IStream stream = (IStream)s;
stream.getSession().frames(stream, new Callback()
List<Frame> frames = Collections.singletonList(new DataFrame(s.getId(), ByteBuffer.allocate(16), true));
stream.getSession().frames(stream, frames, new Callback()
{
@Override
public void failed(Throwable x)
{
serverResetLatch.countDown();
}
}, new DataFrame(s.getId(), ByteBuffer.allocate(16), true));
});
}
};
}

View File

@ -18,11 +18,11 @@
package org.eclipse.jetty.http2;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
@ -160,7 +160,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
{
session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY);
session.frames(stream, Collections.singletonList(frame), Callback.NOOP);
}
@Override

View File

@ -113,6 +113,25 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
return false;
}
public boolean append(List<Entry> list)
{
Throwable closed;
synchronized (this)
{
closed = terminated;
if (closed == null)
{
list.forEach(entries::offer);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, entries={}", list, entries.size());
}
}
if (closed == null)
return true;
list.forEach(entry -> closed(entry, closed));
return false;
}
private int getWindowQueueSize()
{
synchronized (this)
@ -414,6 +433,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
public abstract long onFlushed(long bytes) throws IOException;
boolean hasHighPriority()
{
return false;
}
@Override
public void failed(Throwable x)
{

View File

@ -23,8 +23,10 @@ import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@ -34,6 +36,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
@ -49,6 +52,7 @@ import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.StreamFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.hpack.HpackException;
@ -585,7 +589,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
streamCreator.newStream(frame, promise, listener);
newStream(new IStream.FrameList(frame), promise, listener);
}
@Override
public void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.Listener listener)
{
streamCreator.newStream(frames, promise, listener);
}
@Override
@ -692,46 +702,48 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void control(IStream stream, Callback callback, Frame frame)
{
frames(stream, callback, frame, Frame.EMPTY_ARRAY);
frames(stream, Collections.singletonList(frame), callback);
}
@Override
public void frames(IStream stream, Callback callback, Frame frame, Frame... frames)
public void frames(IStream stream, List<? extends Frame> frames, Callback callback)
{
// We want to generate as late as possible to allow re-prioritization;
// generation will happen while processing the entries.
// The callback needs to be notified only when the last frame completes.
int length = frames.length;
if (length == 0)
int count = frames.size();
if (count > 1)
callback = new CountingCallback(callback, count);
for (int i = 1; i <= count; ++i)
{
frame(new ControlEntry(frame, stream, callback), true);
}
else
{
callback = new CountingCallback(callback, 1 + length);
frame(new ControlEntry(frame, stream, callback), false);
for (int i = 1; i <= length; ++i)
{
frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
}
Frame frame = frames.get(i - 1);
HTTP2Flusher.Entry entry = newEntry(frame, stream, callback);
frame(entry, i == count);
}
}
private HTTP2Flusher.Entry newEntry(Frame frame, IStream stream, Callback callback)
{
return frame.getType() == FrameType.DATA
? new DataEntry((DataFrame)frame, stream, callback)
: new ControlEntry(frame, stream, callback);
}
@Override
public void data(IStream stream, Callback callback, DataFrame frame)
{
// We want to generate as late as possible to allow re-prioritization.
frame(new DataEntry(frame, stream, callback), true);
frame(newEntry(frame, stream, callback), true);
}
private void frame(HTTP2Flusher.Entry entry, boolean flush)
{
if (LOG.isDebugEnabled())
LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this);
LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry, this);
// Ping frames are prepended to process them as soon as possible.
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
boolean queued = entry.hasHighPriority() ? flusher.prepend(entry) : flusher.append(entry);
if (queued && flush)
{
if (entry.stream != null)
@ -1278,6 +1290,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
@Override
boolean hasHighPriority()
{
return frame.getType() == FrameType.PING;
}
@Override
public void succeeded()
{
@ -1613,7 +1631,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void complete()
{
frames(null, Callback.NOOP, newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame());
frames(null, Arrays.asList(newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()), Callback.NOOP);
}
}
@ -1672,29 +1690,30 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int streamId = reserveSlot(slot, currentStreamId);
if (currentStreamId <= 0)
frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive());
frame = frame.withStreamId(streamId);
slot.entry = new ControlEntry(frame, null, callback);
slot.entries = Collections.singletonList(newEntry(frame, null, callback));
flush();
return streamId;
}
private void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
private void newStream(IStream.FrameList frameList, Promise<Stream> promise, Stream.Listener listener)
{
Slot slot = new Slot();
int currentStreamId = frame.getStreamId();
int currentStreamId = frameList.getStreamId();
int streamId = reserveSlot(slot, currentStreamId);
List<StreamFrame> frames = frameList.getFrames();
if (currentStreamId <= 0)
{
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
frames = frames.stream()
.map(frame -> frame.withStreamId(streamId))
.collect(Collectors.toList());
}
try
{
createLocalStream(slot, frame, promise, listener, streamId);
createLocalStream(slot, frames, promise, listener, streamId);
}
catch (Throwable x)
{
@ -1706,11 +1725,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
Slot slot = new Slot();
int streamId = reserveSlot(slot, 0);
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
frame = frame.withStreamId(streamId);
try
{
createLocalStream(slot, frame, promise, listener, streamId);
createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId);
}
catch (Throwable x)
{
@ -1738,11 +1757,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
return streamId;
}
private void createLocalStream(Slot slot, Frame frame, Promise<Stream> promise, Stream.Listener listener, int streamId)
private void createLocalStream(Slot slot, List<StreamFrame> frames, Promise<Stream> promise, Stream.Listener listener, int streamId)
{
IStream stream = HTTP2Session.this.createLocalStream(streamId);
stream.setListener(listener);
slot.entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream));
int count = frames.size();
Callback streamCallback = new StreamPromiseCallback(promise, stream);
Callback callback = count == 1 ? streamCallback : new CountingCallback(streamCallback, count);
slot.entries = frames.stream()
.map(frame -> newEntry(frame, stream, callback))
.collect(Collectors.toList());
flush();
}
@ -1757,16 +1781,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
/**
* Flush goes over the entries of the slots queue to flush the entries,
* until either one of the following two conditions is true:
* - The queue is empty.
* - It reaches a slot with a null entry.
* When a slot with a null entry is encountered, this means a concurrent thread reserved a slot
* but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering
* the null entry must bail out and it is up to the concurrent thread to finish up flushing.
* Note that only one thread can flush at any one time, if two threads happen to call flush
* concurrently, one will do the work while the other will bail out, so it is safe that all
* threads call flush after they're done reserving a slot and setting the entry.
* <p>Iterates over the entries of the slot queue to flush them.</p>
* <p>The flush proceeds until either one of the following two conditions is true:</p>
* <ul>
* <li>the queue is empty</li>
* <li>a slot with a no entries is encountered</li>
* </ul>
* <p>When a slot with a no entries is encountered, then it means that a concurrent thread reserved
* a slot but hasn't set its entries yet. Since slots must be flushed in order, the thread encountering
* the slot with no entries must bail out and it is up to the concurrent thread to finish up flushing.</p>
* <p>Note that only one thread can flush at any time; if two threads happen to call this method
* concurrently, one will do the work while the other will bail out, so it is safe that all
* threads call this method after they are done reserving a slot and setting the entries.</p>
*/
private void flush()
{
@ -1774,7 +1800,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
boolean queued = false;
while (true)
{
ControlEntry entry;
List<HTTP2Flusher.Entry> entries;
synchronized (this)
{
if (flushing == null)
@ -1783,17 +1809,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
return; // another thread is flushing
Slot slot = slots.peek();
entry = slot == null ? null : slot.entry;
entries = slot == null ? null : slot.entries;
if (entry == null)
if (entries == null)
{
flushing = null;
break; // No more slots or null entry, so we may iterate on the flusher
break; // No more slots or null entries, so we may iterate on the flusher
}
slots.poll();
}
queued |= flusher.append(entry);
queued |= flusher.append(entries);
}
if (queued)
flusher.iterate();
@ -1801,7 +1827,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class Slot
{
private volatile ControlEntry entry;
private volatile List<HTTP2Flusher.Entry> entries;
}
}
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.WritePendingException;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -108,10 +109,16 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
}
@Override
public void headers(HeadersFrame frame, Callback callback)
public void send(FrameList frameList, Callback callback)
{
if (startWrite(callback))
session.frames(this, this, frame, Frame.EMPTY_ARRAY);
session.frames(this, frameList.getFrames(), this);
}
@Override
public void headers(HeadersFrame frame, Callback callback)
{
send(new FrameList(frame), callback);
}
@Override
@ -137,7 +144,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
localReset = true;
failure = new EOFException("reset");
}
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
session.frames(this, Collections.singletonList(frame), callback);
}
private boolean startWrite(Callback callback)

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2;
import java.io.IOException;
import java.util.List;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
@ -47,18 +48,25 @@ public interface ISession extends Session
void removeStream(IStream stream);
/**
* <p>Enqueues the given frames to be written to the connection.</p>
* <p>Sends the given list of frames to create a new {@link Stream}.</p>
*
* @param stream the stream the frames belong to
* @param callback the callback that gets notified when the frames have been sent
* @param frame the first frame to enqueue
* @param frames additional frames to enqueue
* @param frames the list of frames to send
* @param promise the promise that gets notified of the stream creation
* @param listener the listener that gets notified of stream events
*/
void frames(IStream stream, Callback callback, Frame frame, Frame... frames);
void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.Listener listener);
/**
* <p>Enqueues the given frames to be written to the connection.</p>
* @param stream the stream the frames belong to
* @param frames additional frames to enqueue
* @param callback the callback that gets notified when the frames have been sent
*/
void frames(IStream stream, List<? extends Frame> frames, Callback callback);
/**
* <p>Enqueues the given PUSH_PROMISE frame to be written to the connection.</p>
* <p>Differently from {@link #frames(IStream, Callback, Frame, Frame...)}, this method
* <p>Differently from {@link #frames(IStream, List, Callback)}, this method
* generates atomically the stream id for the pushed stream.</p>
*
* @param stream the stream associated to the pushed stream

View File

@ -19,9 +19,16 @@
package org.eclipse.jetty.http2;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.StreamFrame;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
@ -52,6 +59,15 @@ public interface IStream extends Stream, Attachable, Closeable
*/
void setListener(Listener listener);
/**
* <p>Sends the given list of frames.</p>
* <p>Typically used to send an HTTP response along with content and possibly trailers.</p>
*
* @param frameList the list of frames to send
* @param callback the callback that gets notified when the frames have been sent
*/
void send(FrameList frameList, Callback callback);
/**
* <p>Processes the given {@code frame}, belonging to this stream.</p>
*
@ -109,4 +125,63 @@ public interface IStream extends Stream, Attachable, Closeable
* @see Listener#onFailure(Stream, int, String, Throwable, Callback)
*/
boolean isResetOrFailed();
/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
public static class FrameList
{
private final List<StreamFrame> frames;
/**
* <p>Creates a frame list of just the given HEADERS frame.</p>
*
* @param headers the HEADERS frame
*/
public FrameList(HeadersFrame headers)
{
Objects.requireNonNull(headers);
this.frames = Collections.singletonList(headers);
}
/**
* <p>Creates a frame list of the given frames.</p>
*
* @param headers the HEADERS frame for the headers
* @param data the DATA frame for the content, or null if there is no content
* @param trailers the HEADERS frame for the trailers, or null if there are no trailers
*/
public FrameList(HeadersFrame headers, DataFrame data, HeadersFrame trailers)
{
Objects.requireNonNull(headers);
ArrayList<StreamFrame> frames = new ArrayList<>(3);
int streamId = headers.getStreamId();
if (data != null && data.getStreamId() != streamId)
throw new IllegalArgumentException("Invalid stream ID for DATA frame " + data);
if (trailers != null && trailers.getStreamId() != streamId)
throw new IllegalArgumentException("Invalid stream ID for HEADERS frame " + trailers);
frames.add(headers);
if (data != null)
frames.add(data);
if (trailers != null)
frames.add(trailers);
this.frames = Collections.unmodifiableList(frames);
}
/**
* @return the stream ID of the frames in this list
*/
public int getStreamId()
{
return frames.get(0).getStreamId();
}
/**
* @return a List of non-null frames
*/
public List<StreamFrame> getFrames()
{
return frames;
}
}
}

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.http2;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback;
@ -44,12 +47,13 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
// This method is called when a whole flow controlled frame has been consumed.
// We send a WindowUpdate every time, even if the frame was very small.
List<Frame> frames = new ArrayList<>(2);
WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length);
frames.add(sessionFrame);
session.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased session recv window by {} for {}", length, session);
Frame[] streamFrame = Frame.EMPTY_ARRAY;
if (stream != null)
{
if (stream.isRemotelyClosed())
@ -59,14 +63,14 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
}
else
{
streamFrame = new Frame[1];
streamFrame[0] = new WindowUpdateFrame(stream.getId(), length);
WindowUpdateFrame streamFrame = new WindowUpdateFrame(stream.getId(), length);
frames.add(streamFrame);
stream.updateRecvWindow(length);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream);
}
}
session.frames(stream, Callback.NOOP, sessionFrame, streamFrame);
session.frames(stream, frames, Callback.NOOP);
}
}

View File

@ -51,7 +51,9 @@ public interface Stream
Session getSession();
/**
* <p>Sends the given HEADERS {@code frame} representing an HTTP response.</p>
* <p>Sends the given HEADERS {@code frame}.</p>
* <p>Typically used to send an HTTP response with no content and no trailers,
* or to send the HTTP response trailers.</p>
*
* @param frame the HEADERS frame to send
* @param callback the callback that gets notified when the frame has been sent

View File

@ -20,13 +20,17 @@ package org.eclipse.jetty.http2.frames;
import java.nio.ByteBuffer;
public class DataFrame extends Frame
public class DataFrame extends StreamFrame
{
private final int streamId;
private final ByteBuffer data;
private final boolean endStream;
private final int padding;
public DataFrame(ByteBuffer data, boolean endStream)
{
this(0, data, endStream);
}
public DataFrame(int streamId, ByteBuffer data, boolean endStream)
{
this(streamId, data, endStream, 0);
@ -34,18 +38,12 @@ public class DataFrame extends Frame
public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding)
{
super(FrameType.DATA);
this.streamId = streamId;
super(FrameType.DATA, streamId);
this.data = data;
this.endStream = endStream;
this.padding = padding;
}
public int getStreamId()
{
return streamId;
}
public ByteBuffer getData()
{
return data;
@ -72,9 +70,15 @@ public class DataFrame extends Frame
return padding;
}
@Override
public DataFrame withStreamId(int streamId)
{
return new DataFrame(streamId, getData(), isEndStream());
}
@Override
public String toString()
{
return String.format("%s#%d{length:%d,end=%b}", super.toString(), streamId, data.remaining(), endStream);
return String.format("%s#%d{length:%d,end=%b}", super.toString(), getStreamId(), data.remaining(), endStream);
}
}

View File

@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames;
import org.eclipse.jetty.http.MetaData;
public class HeadersFrame extends Frame
public class HeadersFrame extends StreamFrame
{
private final int streamId;
private final MetaData metaData;
private final PriorityFrame priority;
private final boolean endStream;
@ -53,18 +52,12 @@ public class HeadersFrame extends Frame
*/
public HeadersFrame(int streamId, MetaData metaData, PriorityFrame priority, boolean endStream)
{
super(FrameType.HEADERS);
this.streamId = streamId;
super(FrameType.HEADERS, streamId);
this.metaData = metaData;
this.priority = priority;
this.endStream = endStream;
}
public int getStreamId()
{
return streamId;
}
public MetaData getMetaData()
{
return metaData;
@ -80,10 +73,18 @@ public class HeadersFrame extends Frame
return endStream;
}
@Override
public HeadersFrame withStreamId(int streamId)
{
PriorityFrame priority = getPriority();
priority = priority == null ? null : priority.withStreamId(streamId);
return new HeadersFrame(streamId, getMetaData(), priority, isEndStream());
}
@Override
public String toString()
{
return String.format("%s#%d{end=%b}%s", super.toString(), streamId, endStream,
return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream,
priority == null ? "" : String.format("+%s", priority));
}
}

View File

@ -18,11 +18,10 @@
package org.eclipse.jetty.http2.frames;
public class PriorityFrame extends Frame
public class PriorityFrame extends StreamFrame
{
public static final int PRIORITY_LENGTH = 5;
private final int streamId;
private final int parentStreamId;
private final int weight;
private final boolean exclusive;
@ -34,20 +33,14 @@ public class PriorityFrame extends Frame
public PriorityFrame(int streamId, int parentStreamId, int weight, boolean exclusive)
{
super(FrameType.PRIORITY);
this.streamId = streamId;
super(FrameType.PRIORITY, streamId);
this.parentStreamId = parentStreamId;
this.weight = weight;
this.exclusive = exclusive;
}
public int getStreamId()
{
return streamId;
}
/**
* @return <code>int</code> of the Parent Stream
* @return {@code int} of the Parent Stream
* @deprecated use {@link #getParentStreamId()} instead.
*/
@Deprecated
@ -71,9 +64,15 @@ public class PriorityFrame extends Frame
return exclusive;
}
@Override
public PriorityFrame withStreamId(int streamId)
{
return new PriorityFrame(streamId, getParentStreamId(), getWeight(), isExclusive());
}
@Override
public String toString()
{
return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), streamId, parentStreamId, weight, exclusive);
return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), getStreamId(), parentStreamId, weight, exclusive);
}
}

View File

@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames;
import org.eclipse.jetty.http.MetaData;
public class PushPromiseFrame extends Frame
public class PushPromiseFrame extends StreamFrame
{
private final int streamId;
private final int promisedStreamId;
private final MetaData metaData;
@ -33,17 +32,11 @@ public class PushPromiseFrame extends Frame
public PushPromiseFrame(int streamId, int promisedStreamId, MetaData metaData)
{
super(FrameType.PUSH_PROMISE);
this.streamId = streamId;
super(FrameType.PUSH_PROMISE, streamId);
this.promisedStreamId = promisedStreamId;
this.metaData = metaData;
}
public int getStreamId()
{
return streamId;
}
public int getPromisedStreamId()
{
return promisedStreamId;
@ -54,9 +47,15 @@ public class PushPromiseFrame extends Frame
return metaData;
}
@Override
public PushPromiseFrame withStreamId(int streamId)
{
return new PushPromiseFrame(getStreamId(), streamId, getMetaData());
}
@Override
public String toString()
{
return String.format("%s#%d/#%d", super.toString(), streamId, promisedStreamId);
return String.format("%s#%d/#%d", super.toString(), getStreamId(), promisedStreamId);
}
}

View File

@ -0,0 +1,37 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.frames;
public abstract class StreamFrame extends Frame
{
private final int streamId;
public StreamFrame(FrameType type, int streamId)
{
super(type);
this.streamId = streamId;
}
public int getStreamId()
{
return streamId;
}
public abstract StreamFrame withStreamId(int streamId);
}

View File

@ -29,10 +29,12 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
@ -50,64 +52,62 @@ public class HttpSenderOverHTTP2 extends HttpSender
}
@Override
protected void sendHeaders(HttpExchange exchange, final HttpContent content, final Callback callback)
protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback)
{
HttpRequest request = exchange.getRequest();
String path = relativize(request.getPath());
HttpURI uri = HttpURI.createHttpURI(request.getScheme(), request.getHost(), request.getPort(), path, null, request.getQuery(), null);
MetaData.Request metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_2, request.getHeaders());
Supplier<HttpFields> trailerSupplier = request.getTrailers();
metaData.setTrailerSupplier(trailerSupplier);
metaData.setTrailerSupplier(request.getTrailers());
HeadersFrame headersFrame;
Promise<Stream> promise;
DataFrame dataFrame = null;
HeadersFrame trailersFrame = null;
if (content.hasContent())
{
headersFrame = new HeadersFrame(metaData, null, false);
promise = new HeadersPromise(request, callback)
if (!expects100Continue(request))
{
@Override
public void succeeded(Stream stream)
boolean advanced = content.advance();
boolean lastContent = content.isLast();
if (advanced)
{
super.succeeded(stream);
if (expects100Continue(request))
if (lastContent)
{
// Don't send the content yet.
callback.succeeded();
HttpFields trailers = retrieveTrailers(request);
boolean hasTrailers = trailers != null;
dataFrame = new DataFrame(content.getByteBuffer(), !hasTrailers);
if (hasTrailers)
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true);
}
else
{
boolean advanced = content.advance();
boolean lastContent = content.isLast();
if (advanced || lastContent)
sendContent(stream, content, trailerSupplier, callback);
else
callback.succeeded();
dataFrame = new DataFrame(content.getByteBuffer(), false);
}
}
};
else if (lastContent)
{
HttpFields trailers = retrieveTrailers(request);
if (trailers != null)
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true);
else
dataFrame = new DataFrame(BufferUtil.EMPTY_BUFFER, true);
}
}
}
else
{
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
boolean endStream = trailers == null || trailers.size() == 0;
headersFrame = new HeadersFrame(metaData, null, endStream);
promise = new HeadersPromise(request, callback)
{
@Override
public void succeeded(Stream stream)
{
super.succeeded(stream);
if (endStream)
callback.succeeded();
else
sendTrailers(stream, trailers, callback);
}
};
HttpFields trailers = retrieveTrailers(request);
boolean hasTrailers = trailers != null;
headersFrame = new HeadersFrame(metaData, null, !hasTrailers);
if (hasTrailers)
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true);
}
// TODO optimize the send of HEADERS and DATA frames.
HttpChannelOverHTTP2 channel = getHttpChannel();
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
IStream.FrameList frameList = new IStream.FrameList(headersFrame, dataFrame, trailersFrame);
((ISession)channel.getSession()).newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener());
}
private String relativize(String path)
@ -128,6 +128,13 @@ public class HttpSenderOverHTTP2 extends HttpSender
}
}
private HttpFields retrieveTrailers(HttpRequest request)
{
Supplier<HttpFields> trailerSupplier = request.getTrailers();
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
return trailers == null || trailers.size() == 0 ? null : trailers;
}
@Override
protected void sendContent(HttpExchange exchange, HttpContent content, Callback callback)
{
@ -140,25 +147,27 @@ public class HttpSenderOverHTTP2 extends HttpSender
}
else
{
Stream stream = getHttpChannel().getStream();
Supplier<HttpFields> trailerSupplier = exchange.getRequest().getTrailers();
sendContent(stream, content, trailerSupplier, callback);
sendContent(getHttpChannel().getStream(), exchange.getRequest(), content, callback);
}
}
private void sendContent(Stream stream, HttpContent content, Supplier<HttpFields> trailerSupplier, Callback callback)
private void sendContent(Stream stream, HttpRequest request, HttpContent content, Callback callback)
{
boolean lastContent = content.isLast();
HttpFields trailers = null;
boolean endStream = false;
if (lastContent)
if (content.isLast())
{
trailers = trailerSupplier == null ? null : trailerSupplier.get();
endStream = trailers == null || trailers.size() == 0;
HttpFields trailers = retrieveTrailers(request);
boolean hasTrailers = trailers != null;
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), !hasTrailers);
if (hasTrailers)
stream.data(dataFrame, Callback.from(() -> sendTrailers(stream, trailers, callback), callback::failed));
else
stream.data(dataFrame, callback);
}
else
{
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), false);
stream.data(dataFrame, callback);
}
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), endStream);
HttpFields fTrailers = trailers;
stream.data(dataFrame, endStream || !lastContent ? callback : Callback.from(() -> sendTrailers(stream, fTrailers, callback), callback::failed));
}
private void sendTrailers(Stream stream, HttpFields trailers, Callback callback)
@ -188,6 +197,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);
callback.succeeded();
}
@Override

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http2.server;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@ -72,9 +73,9 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
}
if (windowFrame == null)
frames(null, Callback.NOOP, settingsFrame, Frame.EMPTY_ARRAY);
frames(null, Collections.singletonList(settingsFrame), Callback.NOOP);
else
frames(null, Callback.NOOP, settingsFrame, windowFrame);
frames(null, Arrays.asList(settingsFrame, windowFrame), Callback.NOOP);
}
@Override

View File

@ -89,140 +89,154 @@ public class HttpTransportOverHTTP2 implements HttpTransport
@Override
public void send(MetaData.Response info, boolean isHeadRequest, ByteBuffer content, boolean lastContent, Callback callback)
{
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
if (info != null)
sendHeaders(info, content, lastContent, isHeadRequest, callback);
else
sendContent(content, lastContent, isHeadRequest, callback);
}
private void sendHeaders(MetaData.Response info, ByteBuffer content, boolean lastContent, boolean isHeadRequest, Callback callback)
{
metaData = info;
HeadersFrame headersFrame;
DataFrame dataFrame = null;
HeadersFrame trailersFrame = null;
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
int status = info.getStatus();
boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102;
if (interimResponse)
{
metaData = info;
int status = info.getStatus();
boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102;
if (interimResponse)
// Must not commit interim responses.
if (hasContent)
{
// Must not commit interim responses.
callback.failed(new IllegalStateException("Interim response cannot have content"));
return;
}
headersFrame = new HeadersFrame(stream.getId(), info, null, false);
}
else
{
if (commit.compareAndSet(false, true))
{
if (lastContent)
{
long realContentLength = BufferUtil.length(content);
long contentLength = info.getContentLength();
if (contentLength < 0)
{
info.setContentLength(realContentLength);
}
else if (hasContent && contentLength != realContentLength)
{
callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength)));
return;
}
}
if (hasContent)
{
callback.failed(new IllegalStateException("Interim response cannot have content"));
headersFrame = new HeadersFrame(stream.getId(), info, null, false);
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers == null)
{
dataFrame = new DataFrame(stream.getId(), content, true);
}
else
{
dataFrame = new DataFrame(stream.getId(), content, false);
trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true);
}
}
else
{
dataFrame = new DataFrame(stream.getId(), content, false);
}
}
else
{
transportCallback.send(callback, false, c ->
sendHeadersFrame(info, false, c));
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers == null)
{
headersFrame = new HeadersFrame(stream.getId(), info, null, true);
}
else
{
headersFrame = new HeadersFrame(stream.getId(), info, null, false);
trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true);
}
}
else
{
headersFrame = new HeadersFrame(stream.getId(), info, null, false);
}
}
}
else
{
if (commit.compareAndSet(false, true))
{
if (lastContent)
{
long realContentLength = BufferUtil.length(content);
long contentLength = info.getContentLength();
if (contentLength < 0)
{
info.setContentLength(realContentLength);
}
else if (hasContent && contentLength != realContentLength)
{
callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength)));
return;
}
}
callback.failed(new IllegalStateException("Response already committed"));
return;
}
}
if (hasContent)
{
Callback commitCallback = new Callback.Nested(callback)
{
@Override
public void succeeded()
{
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
transportCallback.send(new SendTrailers(getCallback(), trailers), false, c ->
sendDataFrame(content, true, false, c));
}
else
{
transportCallback.send(getCallback(), false, c ->
sendDataFrame(content, true, true, c));
}
}
else
{
transportCallback.send(getCallback(), false, c ->
sendDataFrame(content, false, false, c));
}
}
};
transportCallback.send(commitCallback, true, c ->
sendHeadersFrame(info, false, c));
}
else
{
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
transportCallback.send(new SendTrailers(callback, trailers), true, c ->
sendHeadersFrame(info, false, c));
}
else
{
transportCallback.send(callback, true, c ->
sendHeadersFrame(info, true, c));
}
}
else
{
transportCallback.send(callback, true, c ->
sendHeadersFrame(info, false, c));
}
}
HeadersFrame hf = headersFrame;
DataFrame df = dataFrame;
HeadersFrame tf = trailersFrame;
transportCallback.send(callback, true, c ->
{
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
System.lineSeparator(), info.getFields());
}
stream.send(new IStream.FrameList(hf, df, tf), c);
});
}
private void sendContent(ByteBuffer content, boolean lastContent, boolean isHeadRequest, Callback callback)
{
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
if (hasContent || lastContent)
{
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers == null)
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, true, true, c));
}
else
{
callback.failed(new IllegalStateException("committed"));
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
if (hasContent)
{
transportCallback.send(sendTrailers, false, c ->
sendDataFrame(content, true, false, c));
}
else
{
sendTrailers.succeeded();
}
}
}
else
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, false, false, c));
}
}
else
{
if (hasContent || lastContent)
{
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
if (hasContent)
{
transportCallback.send(sendTrailers, false, c ->
sendDataFrame(content, true, false, c));
}
else
{
sendTrailers.succeeded();
}
}
else
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, true, true, c));
}
}
else
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, false, false, c));
}
}
else
{
callback.succeeded();
}
callback.succeeded();
}
}
@ -273,20 +287,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}, new Stream.Listener.Adapter()); // TODO: handle reset from the client ?
}
private void sendHeadersFrame(MetaData.Response info, boolean endStream, Callback callback)
{
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
System.lineSeparator(), info.getFields());
}
HeadersFrame frame = new HeadersFrame(stream.getId(), info, null, endStream);
stream.headers(frame, callback);
}
private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback)
{
if (LOG.isDebugEnabled())