Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-09-10 13:26:15 +02:00
commit b997410845
20 changed files with 520 additions and 312 deletions

View File

@ -31,7 +31,6 @@ import java.nio.channels.SocketChannel;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -563,7 +562,8 @@ public class HttpClient extends ContainerLifeCycle
@Override @Override
public void succeeded(List<InetSocketAddress> socketAddresses) public void succeeded(List<InetSocketAddress> socketAddresses)
{ {
Map<String, Object> context = new HashMap<>(); // Multiple threads may access the map, especially with DEBUG logging enabled.
Map<String, Object> context = new ConcurrentHashMap<>();
context.put(ClientConnectionFactory.CLIENT_CONTEXT_KEY, HttpClient.this); context.put(ClientConnectionFactory.CLIENT_CONTEXT_KEY, HttpClient.this);
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination); context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
connect(socketAddresses, 0, context); connect(socketAddresses, 0, context);

View File

@ -21,10 +21,10 @@ package org.eclipse.jetty.http2.client;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.time.Duration; import java.time.Duration;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory; import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
@ -391,7 +391,7 @@ public class HTTP2Client extends ContainerLifeCycle
private Map<String, Object> contextFrom(ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise, Map<String, Object> context) private Map<String, Object> contextFrom(ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
{ {
if (context == null) if (context == null)
context = new HashMap<>(); context = new ConcurrentHashMap<>();
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this); context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener); context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise); context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2.client; package org.eclipse.jetty.http2.client;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -129,11 +130,11 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
if (windowDelta > 0) if (windowDelta > 0)
{ {
session.updateRecvWindow(windowDelta); session.updateRecvWindow(windowDelta);
session.frames(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)); session.frames(null, List.of(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this);
} }
else else
{ {
session.frames(null, this, prefaceFrame, settingsFrame); session.frames(null, List.of(prefaceFrame, settingsFrame), this);
} }
} }

View File

@ -63,6 +63,7 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame; import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
@ -198,14 +199,15 @@ public class StreamResetTest extends AbstractTest
{ {
// Simulate that there is pending data to send. // Simulate that there is pending data to send.
IStream stream = (IStream)s; IStream stream = (IStream)s;
stream.getSession().frames(stream, new Callback() List<Frame> frames = List.of(new DataFrame(s.getId(), ByteBuffer.allocate(16), true));
stream.getSession().frames(stream, frames, new Callback()
{ {
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {
serverResetLatch.countDown(); serverResetLatch.countDown();
} }
}, new DataFrame(s.getId(), ByteBuffer.allocate(16), true)); });
} }
}; };
} }

View File

@ -18,11 +18,11 @@
package org.eclipse.jetty.http2; package org.eclipse.jetty.http2;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Atomics; import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -171,7 +171,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame) protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
{ {
session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY); session.frames(stream, List.of(frame), Callback.NOOP);
} }
@Override @Override

View File

@ -115,6 +115,25 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
return false; return false;
} }
public boolean append(List<Entry> list)
{
Throwable closed;
synchronized (this)
{
closed = terminated;
if (closed == null)
{
list.forEach(entries::offer);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, entries={}", list, entries.size());
}
}
if (closed == null)
return true;
list.forEach(entry -> closed(entry, closed));
return false;
}
private int getWindowQueueSize() private int getWindowQueueSize()
{ {
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
@ -416,6 +435,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
public abstract long onFlushed(long bytes) throws IOException; public abstract long onFlushed(long bytes) throws IOException;
boolean hasHighPriority()
{
return false;
}
@Override @Override
public void failed(Throwable x) public void failed(Throwable x)
{ {

View File

@ -26,6 +26,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -36,6 +37,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
@ -53,6 +55,7 @@ import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.StreamFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.hpack.HpackException; import org.eclipse.jetty.http2.hpack.HpackException;
@ -602,7 +605,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
@Override @Override
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener) public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
{ {
streamCreator.newStream(frame, promise, listener); newStream(new IStream.FrameList(frame), promise, listener);
}
@Override
public void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.Listener listener)
{
streamCreator.newStream(frames, promise, listener);
} }
/** /**
@ -617,11 +626,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
*/ */
public IStream newLocalStream(HeadersFrame frameIn, HeadersFrame[] frameOut) public IStream newLocalStream(HeadersFrame frameIn, HeadersFrame[] frameOut)
{ {
int streamId = frameIn.getStreamId(); HeadersFrame frame = frameIn;
int streamId = frame.getStreamId();
if (streamId <= 0) if (streamId <= 0)
{
streamId = localStreamIds.getAndAdd(2); streamId = localStreamIds.getAndAdd(2);
frame = frame.withStreamId(streamId);
}
HeadersFrame frame = streamCreator.prepareHeadersFrame(streamId, frameIn);
if (frameOut != null) if (frameOut != null)
frameOut[0] = frame; frameOut[0] = frame;
@ -749,46 +761,48 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void control(IStream stream, Callback callback, Frame frame) private void control(IStream stream, Callback callback, Frame frame)
{ {
frames(stream, callback, frame, Frame.EMPTY_ARRAY); frames(stream, List.of(frame), callback);
} }
@Override @Override
public void frames(IStream stream, Callback callback, Frame frame, Frame... frames) public void frames(IStream stream, List<? extends Frame> frames, Callback callback)
{ {
// We want to generate as late as possible to allow re-prioritization; // We want to generate as late as possible to allow re-prioritization;
// generation will happen while processing the entries. // generation will happen while processing the entries.
// The callback needs to be notified only when the last frame completes. // The callback needs to be notified only when the last frame completes.
int length = frames.length; int count = frames.size();
if (length == 0) if (count > 1)
callback = new CountingCallback(callback, count);
for (int i = 1; i <= count; ++i)
{ {
frame(new ControlEntry(frame, stream, callback), true); Frame frame = frames.get(i - 1);
} HTTP2Flusher.Entry entry = newEntry(frame, stream, callback);
else frame(entry, i == count);
{
callback = new CountingCallback(callback, 1 + length);
frame(new ControlEntry(frame, stream, callback), false);
for (int i = 1; i <= length; ++i)
{
frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
}
} }
} }
private HTTP2Flusher.Entry newEntry(Frame frame, IStream stream, Callback callback)
{
return frame.getType() == FrameType.DATA
? new DataEntry((DataFrame)frame, stream, callback)
: new ControlEntry(frame, stream, callback);
}
@Override @Override
public void data(IStream stream, Callback callback, DataFrame frame) public void data(IStream stream, Callback callback, DataFrame frame)
{ {
// We want to generate as late as possible to allow re-prioritization. // We want to generate as late as possible to allow re-prioritization.
frame(new DataEntry(frame, stream, callback), true); frame(newEntry(frame, stream, callback), true);
} }
private void frame(HTTP2Flusher.Entry entry, boolean flush) private void frame(HTTP2Flusher.Entry entry, boolean flush)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this); LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry, this);
// Ping frames are prepended to process them as soon as possible. // Ping frames are prepended to process them as soon as possible.
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry); boolean queued = entry.hasHighPriority() ? flusher.prepend(entry) : flusher.append(entry);
if (queued && flush) if (queued && flush)
{ {
if (entry.stream != null) if (entry.stream != null)
@ -1356,6 +1370,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
@Override
boolean hasHighPriority()
{
return frame.getType() == FrameType.PING;
}
@Override @Override
public void succeeded() public void succeeded()
{ {
@ -1700,7 +1720,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void complete() private void complete()
{ {
frames(null, Callback.NOOP, newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()); frames(null, List.of(newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()), Callback.NOOP);
} }
} }
@ -1760,26 +1780,40 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int streamId = reserveSlot(slot, currentStreamId); int streamId = reserveSlot(slot, currentStreamId);
if (currentStreamId <= 0) if (currentStreamId <= 0)
frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive()); frame = frame.withStreamId(streamId);
assignSlotAndFlush(slot, new ControlEntry(frame, null, callback)); assignSlotAndFlush(slot, newEntry(frame, null, callback));
return streamId; return streamId;
} }
private void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener) private void newStream(IStream.FrameList frameList, Promise<Stream> promise, Stream.Listener listener)
{ {
Slot slot = new Slot(); Slot slot = new Slot();
int currentStreamId = frame.getStreamId(); int currentStreamId = frameList.getStreamId();
int streamId = reserveSlot(slot, currentStreamId); int streamId = reserveSlot(slot, currentStreamId);
frame = prepareHeadersFrame(streamId, frame); List<StreamFrame> frames = frameList.getFrames();
if (currentStreamId <= 0)
{
frames = frames.stream()
.map(frame -> frame.withStreamId(streamId))
.collect(Collectors.toList());
}
try try
{ {
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData()); HeadersFrame headersFrame = (HeadersFrame)frameList.getFrames().get(0);
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)headersFrame.getMetaData());
stream.setListener(listener); stream.setListener(listener);
stream.process(new PrefaceFrame(), Callback.NOOP); stream.process(new PrefaceFrame(), Callback.NOOP);
assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
int count = frames.size();
Callback streamCallback = new StreamPromiseCallback(promise, stream);
Callback callback = count == 1 ? streamCallback : new CountingCallback(streamCallback, count);
List<HTTP2Flusher.Entry> entries = frames.stream()
.map(frame -> newEntry(frame, stream, callback))
.collect(Collectors.toList());
assignSlotAndFlush(slot, entries);
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -1787,28 +1821,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
private HeadersFrame prepareHeadersFrame(int streamId, HeadersFrame frame)
{
if (frame.getStreamId() <= 0)
{
PriorityFrame priority = frame.getPriority();
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive());
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
}
return frame;
}
private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener) private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener)
{ {
Slot slot = new Slot(); Slot slot = new Slot();
int streamId = reserveSlot(slot, 0); int streamId = reserveSlot(slot, 0);
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); frame = frame.withStreamId(streamId);
try try
{ {
IStream stream = HTTP2Session.this.createLocalStream(streamId, frame.getMetaData()); IStream stream = HTTP2Session.this.createLocalStream(streamId, frame.getMetaData());
stream.setListener(listener); stream.setListener(listener);
assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream))); assignSlotAndFlush(slot, newEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
} }
catch (Throwable x) catch (Throwable x)
{ {
@ -1816,13 +1839,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
private void assignSlotAndFlush(Slot slot, ControlEntry entry)
{
// Every time a slot entry is assigned, we must flush.
slot.entry = entry;
flush();
}
private int reserveSlot(Slot slot, int streamId) private int reserveSlot(Slot slot, int streamId)
{ {
if (streamId <= 0) if (streamId <= 0)
@ -1843,6 +1859,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
return streamId; return streamId;
} }
private void assignSlotAndFlush(Slot slot, HTTP2Flusher.Entry entry)
{
assignSlotAndFlush(slot, List.of(entry));
}
private void assignSlotAndFlush(Slot slot, List<HTTP2Flusher.Entry> entries)
{
// Every time a slot entry is assigned, we must flush.
slot.entries = entries;
flush();
}
private void releaseSlotFlushAndFail(Slot slot, Promise<Stream> promise, Throwable x) private void releaseSlotFlushAndFail(Slot slot, Promise<Stream> promise, Throwable x)
{ {
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
@ -1854,16 +1882,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
/** /**
* Flush goes over the entries of the slots queue to flush the entries, * <p>Iterates over the entries of the slot queue to flush them.</p>
* until either one of the following two conditions is true: * <p>The flush proceeds until either one of the following two conditions is true:</p>
* - The queue is empty. * <ul>
* - It reaches a slot with a null entry. * <li>the queue is empty</li>
* When a slot with a null entry is encountered, this means a concurrent thread reserved a slot * <li>a slot with a no entries is encountered</li>
* but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering * </ul>
* the null entry must bail out and it is up to the concurrent thread to finish up flushing. * <p>When a slot with a no entries is encountered, then it means that a concurrent thread reserved
* Note that only one thread can flush at any one time, if two threads happen to call flush * a slot but hasn't set its entries yet. Since slots must be flushed in order, the thread encountering
* concurrently, one will do the work while the other will bail out, so it is safe that all * the slot with no entries must bail out and it is up to the concurrent thread to finish up flushing.</p>
* threads call flush after they're done reserving a slot and setting the entry. * <p>Note that only one thread can flush at any time; if two threads happen to call this method
* concurrently, one will do the work while the other will bail out, so it is safe that all
* threads call this method after they are done reserving a slot and setting the entries.</p>
*/ */
private void flush() private void flush()
{ {
@ -1871,7 +1901,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
boolean queued = false; boolean queued = false;
while (true) while (true)
{ {
ControlEntry entry; List<HTTP2Flusher.Entry> entries;
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
{ {
if (flushing == null) if (flushing == null)
@ -1880,18 +1910,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
return; // Another thread is flushing. return; // Another thread is flushing.
Slot slot = slots.peek(); Slot slot = slots.peek();
entry = slot == null ? null : slot.entry; entries = slot == null ? null : slot.entries;
if (entry == null) if (entries == null)
{ {
flushing = null; flushing = null;
// No more slots or null entry, so we may iterate on the flusher. // No more slots or null entries, so we may iterate on the flusher.
break; break;
} }
slots.poll(); slots.poll();
} }
queued |= flusher.append(entry); queued |= flusher.append(entries);
} }
if (queued) if (queued)
flusher.iterate(); flusher.iterate();
@ -1899,7 +1929,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class Slot private class Slot
{ {
private volatile ControlEntry entry; private volatile List<HTTP2Flusher.Entry> entries;
} }
} }
} }

View File

@ -22,6 +22,7 @@ import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -122,9 +123,15 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override @Override
public void headers(HeadersFrame frame, Callback callback) public void headers(HeadersFrame frame, Callback callback)
{
send(new FrameList(frame), callback);
}
@Override
public void send(FrameList frameList, Callback callback)
{ {
if (startWrite(callback)) if (startWrite(callback))
session.frames(this, this, frame, Frame.EMPTY_ARRAY); session.frames(this, frameList.getFrames(), this);
} }
@Override @Override
@ -150,7 +157,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
localReset = true; localReset = true;
failure = new EOFException("reset"); failure = new EOFException("reset");
} }
session.frames(this, callback, frame, Frame.EMPTY_ARRAY); session.frames(this, List.of(frame), callback);
} }
private boolean startWrite(Callback callback) private boolean startWrite(Callback callback)

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2; package org.eclipse.jetty.http2;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
@ -48,18 +49,25 @@ public interface ISession extends Session
public void removeStream(IStream stream); public void removeStream(IStream stream);
/** /**
* <p>Enqueues the given frames to be written to the connection.</p> * <p>Sends the given list of frames to create a new {@link Stream}.</p>
* *
* @param stream the stream the frames belong to * @param frames the list of frames to send
* @param callback the callback that gets notified when the frames have been sent * @param promise the promise that gets notified of the stream creation
* @param frame the first frame to enqueue * @param listener the listener that gets notified of stream events
* @param frames additional frames to enqueue
*/ */
public void frames(IStream stream, Callback callback, Frame frame, Frame... frames); public void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.Listener listener);
/**
* <p>Enqueues the given frames to be written to the connection.</p>
* @param stream the stream the frames belong to
* @param frames the frames to enqueue
* @param callback the callback that gets notified when the frames have been sent
*/
public void frames(IStream stream, List<? extends Frame> frames, Callback callback);
/** /**
* <p>Enqueues the given PUSH_PROMISE frame to be written to the connection.</p> * <p>Enqueues the given PUSH_PROMISE frame to be written to the connection.</p>
* <p>Differently from {@link #frames(IStream, Callback, Frame, Frame...)}, this method * <p>Differently from {@link #frames(IStream, List, Callback)}, this method
* generates atomically the stream id for the pushed stream.</p> * generates atomically the stream id for the pushed stream.</p>
* *
* @param stream the stream associated to the pushed stream * @param stream the stream associated to the pushed stream

View File

@ -19,9 +19,16 @@
package org.eclipse.jetty.http2; package org.eclipse.jetty.http2;
import java.io.Closeable; import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.StreamFrame;
import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -52,6 +59,15 @@ public interface IStream extends Stream, Attachable, Closeable
*/ */
public void setListener(Listener listener); public void setListener(Listener listener);
/**
* <p>Sends the given list of frames.</p>
* <p>Typically used to send an HTTP response along with content and possibly trailers.</p>
*
* @param frameList the list of frames to send
* @param callback the callback that gets notified when the frames have been sent
*/
void send(FrameList frameList, Callback callback);
/** /**
* <p>Processes the given {@code frame}, belonging to this stream.</p> * <p>Processes the given {@code frame}, belonging to this stream.</p>
* *
@ -109,4 +125,63 @@ public interface IStream extends Stream, Attachable, Closeable
* @see Listener#onFailure(Stream, int, String, Throwable, Callback) * @see Listener#onFailure(Stream, int, String, Throwable, Callback)
*/ */
boolean isResetOrFailed(); boolean isResetOrFailed();
/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
public static class FrameList
{
private final List<StreamFrame> frames;
/**
* <p>Creates a frame list of just the given HEADERS frame.</p>
*
* @param headers the HEADERS frame
*/
public FrameList(HeadersFrame headers)
{
Objects.requireNonNull(headers);
this.frames = List.of(headers);
}
/**
* <p>Creates a frame list of the given frames.</p>
*
* @param headers the HEADERS frame for the headers
* @param data the DATA frame for the content, or null if there is no content
* @param trailers the HEADERS frame for the trailers, or null if there are no trailers
*/
public FrameList(HeadersFrame headers, DataFrame data, HeadersFrame trailers)
{
Objects.requireNonNull(headers);
ArrayList<StreamFrame> frames = new ArrayList<>(3);
int streamId = headers.getStreamId();
if (data != null && data.getStreamId() != streamId)
throw new IllegalArgumentException("Invalid stream ID for DATA frame " + data);
if (trailers != null && trailers.getStreamId() != streamId)
throw new IllegalArgumentException("Invalid stream ID for HEADERS frame " + trailers);
frames.add(headers);
if (data != null)
frames.add(data);
if (trailers != null)
frames.add(trailers);
this.frames = Collections.unmodifiableList(frames);
}
/**
* @return the stream ID of the frames in this list
*/
public int getStreamId()
{
return frames.get(0).getStreamId();
}
/**
* @return a List of non-null frames
*/
public List<StreamFrame> getFrames()
{
return frames;
}
}
} }

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.http2; package org.eclipse.jetty.http2;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -44,12 +47,13 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
// This method is called when a whole flow controlled frame has been consumed. // This method is called when a whole flow controlled frame has been consumed.
// We send a WindowUpdate every time, even if the frame was very small. // We send a WindowUpdate every time, even if the frame was very small.
final WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length); List<Frame> frames = new ArrayList<>(2);
WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length);
frames.add(sessionFrame);
session.updateRecvWindow(length); session.updateRecvWindow(length);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased session recv window by {} for {}", length, session); LOG.debug("Data consumed, increased session recv window by {} for {}", length, session);
Frame[] streamFrame = Frame.EMPTY_ARRAY;
if (stream != null) if (stream != null)
{ {
if (stream.isRemotelyClosed()) if (stream.isRemotelyClosed())
@ -59,14 +63,14 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
} }
else else
{ {
streamFrame = new Frame[1]; WindowUpdateFrame streamFrame = new WindowUpdateFrame(stream.getId(), length);
streamFrame[0] = new WindowUpdateFrame(stream.getId(), length); frames.add(streamFrame);
stream.updateRecvWindow(length); stream.updateRecvWindow(length);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream); LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream);
} }
} }
session.frames(stream, Callback.NOOP, sessionFrame, streamFrame); session.frames(stream, frames, Callback.NOOP);
} }
} }

View File

@ -66,7 +66,9 @@ public interface Stream
} }
/** /**
* <p>Sends the given HEADERS {@code frame} representing an HTTP response.</p> * <p>Sends the given HEADERS {@code frame}.</p>
* <p>Typically used to send an HTTP response with no content and no trailers,
* or to send the HTTP response trailers.</p>
* *
* @param frame the HEADERS frame to send * @param frame the HEADERS frame to send
* @param callback the callback that gets notified when the frame has been sent * @param callback the callback that gets notified when the frame has been sent

View File

@ -20,13 +20,17 @@ package org.eclipse.jetty.http2.frames;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public class DataFrame extends Frame public class DataFrame extends StreamFrame
{ {
private final int streamId;
private final ByteBuffer data; private final ByteBuffer data;
private final boolean endStream; private final boolean endStream;
private final int padding; private final int padding;
public DataFrame(ByteBuffer data, boolean endStream)
{
this(0, data, endStream);
}
public DataFrame(int streamId, ByteBuffer data, boolean endStream) public DataFrame(int streamId, ByteBuffer data, boolean endStream)
{ {
this(streamId, data, endStream, 0); this(streamId, data, endStream, 0);
@ -34,18 +38,12 @@ public class DataFrame extends Frame
public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding) public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding)
{ {
super(FrameType.DATA); super(FrameType.DATA, streamId);
this.streamId = streamId;
this.data = data; this.data = data;
this.endStream = endStream; this.endStream = endStream;
this.padding = padding; this.padding = padding;
} }
public int getStreamId()
{
return streamId;
}
public ByteBuffer getData() public ByteBuffer getData()
{ {
return data; return data;
@ -72,9 +70,15 @@ public class DataFrame extends Frame
return padding; return padding;
} }
@Override
public DataFrame withStreamId(int streamId)
{
return new DataFrame(streamId, getData(), isEndStream());
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s#%d{length:%d,end=%b}", super.toString(), streamId, data.remaining(), endStream); return String.format("%s#%d{length:%d,end=%b}", super.toString(), getStreamId(), data.remaining(), endStream);
} }
} }

View File

@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
public class HeadersFrame extends Frame public class HeadersFrame extends StreamFrame
{ {
private final int streamId;
private final MetaData metaData; private final MetaData metaData;
private final PriorityFrame priority; private final PriorityFrame priority;
private final boolean endStream; private final boolean endStream;
@ -53,18 +52,12 @@ public class HeadersFrame extends Frame
*/ */
public HeadersFrame(int streamId, MetaData metaData, PriorityFrame priority, boolean endStream) public HeadersFrame(int streamId, MetaData metaData, PriorityFrame priority, boolean endStream)
{ {
super(FrameType.HEADERS); super(FrameType.HEADERS, streamId);
this.streamId = streamId;
this.metaData = metaData; this.metaData = metaData;
this.priority = priority; this.priority = priority;
this.endStream = endStream; this.endStream = endStream;
} }
public int getStreamId()
{
return streamId;
}
public MetaData getMetaData() public MetaData getMetaData()
{ {
return metaData; return metaData;
@ -80,10 +73,18 @@ public class HeadersFrame extends Frame
return endStream; return endStream;
} }
@Override
public HeadersFrame withStreamId(int streamId)
{
PriorityFrame priority = getPriority();
priority = priority == null ? null : priority.withStreamId(streamId);
return new HeadersFrame(streamId, getMetaData(), priority, isEndStream());
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s#%d{end=%b}%s", super.toString(), streamId, endStream, return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream,
priority == null ? "" : String.format("+%s", priority)); priority == null ? "" : String.format("+%s", priority));
} }
} }

View File

@ -18,11 +18,10 @@
package org.eclipse.jetty.http2.frames; package org.eclipse.jetty.http2.frames;
public class PriorityFrame extends Frame public class PriorityFrame extends StreamFrame
{ {
public static final int PRIORITY_LENGTH = 5; public static final int PRIORITY_LENGTH = 5;
private final int streamId;
private final int parentStreamId; private final int parentStreamId;
private final int weight; private final int weight;
private final boolean exclusive; private final boolean exclusive;
@ -34,18 +33,12 @@ public class PriorityFrame extends Frame
public PriorityFrame(int streamId, int parentStreamId, int weight, boolean exclusive) public PriorityFrame(int streamId, int parentStreamId, int weight, boolean exclusive)
{ {
super(FrameType.PRIORITY); super(FrameType.PRIORITY, streamId);
this.streamId = streamId;
this.parentStreamId = parentStreamId; this.parentStreamId = parentStreamId;
this.weight = weight; this.weight = weight;
this.exclusive = exclusive; this.exclusive = exclusive;
} }
public int getStreamId()
{
return streamId;
}
public int getParentStreamId() public int getParentStreamId()
{ {
return parentStreamId; return parentStreamId;
@ -61,9 +54,15 @@ public class PriorityFrame extends Frame
return exclusive; return exclusive;
} }
@Override
public PriorityFrame withStreamId(int streamId)
{
return new PriorityFrame(streamId, getParentStreamId(), getWeight(), isExclusive());
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), streamId, parentStreamId, weight, exclusive); return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), getStreamId(), parentStreamId, weight, exclusive);
} }
} }

View File

@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
public class PushPromiseFrame extends Frame public class PushPromiseFrame extends StreamFrame
{ {
private final int streamId;
private final int promisedStreamId; private final int promisedStreamId;
private final MetaData.Request metaData; private final MetaData.Request metaData;
@ -33,17 +32,11 @@ public class PushPromiseFrame extends Frame
public PushPromiseFrame(int streamId, int promisedStreamId, MetaData.Request metaData) public PushPromiseFrame(int streamId, int promisedStreamId, MetaData.Request metaData)
{ {
super(FrameType.PUSH_PROMISE); super(FrameType.PUSH_PROMISE, streamId);
this.streamId = streamId;
this.promisedStreamId = promisedStreamId; this.promisedStreamId = promisedStreamId;
this.metaData = metaData; this.metaData = metaData;
} }
public int getStreamId()
{
return streamId;
}
public int getPromisedStreamId() public int getPromisedStreamId()
{ {
return promisedStreamId; return promisedStreamId;
@ -54,9 +47,15 @@ public class PushPromiseFrame extends Frame
return metaData; return metaData;
} }
@Override
public PushPromiseFrame withStreamId(int streamId)
{
return new PushPromiseFrame(getStreamId(), streamId, getMetaData());
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s#%d/#%d", super.toString(), streamId, promisedStreamId); return String.format("%s#%d/#%d", super.toString(), getStreamId(), promisedStreamId);
} }
} }

View File

@ -0,0 +1,37 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http2.frames;
public abstract class StreamFrame extends Frame
{
private final int streamId;
public StreamFrame(FrameType type, int streamId)
{
super(type);
this.streamId = streamId;
}
public int getStreamId()
{
return streamId;
}
public abstract StreamFrame withStreamId(int streamId);
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.http2.client.http;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
@ -32,6 +31,8 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
@ -57,7 +58,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
} }
@Override @Override
protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, final Callback callback) protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback)
{ {
HttpRequest request = exchange.getRequest(); HttpRequest request = exchange.getRequest();
boolean isTunnel = HttpMethod.CONNECT.is(request.getMethod()); boolean isTunnel = HttpMethod.CONNECT.is(request.getMethod());
@ -88,38 +89,52 @@ public class HttpSenderOverHTTP2 extends HttpSender
} }
HeadersFrame headersFrame; HeadersFrame headersFrame;
Promise<Stream> promise; DataFrame dataFrame = null;
HeadersFrame trailersFrame = null;
if (isTunnel) if (isTunnel)
{ {
headersFrame = new HeadersFrame(metaData, null, false); headersFrame = new HeadersFrame(metaData, null, false);
promise = new HeadersPromise(request, callback, stream -> callback.succeeded());
} }
else else
{ {
Supplier<HttpFields> trailerSupplier = request.getTrailers(); boolean hasContent = BufferUtil.hasContent(contentBuffer);
if (BufferUtil.isEmpty(contentBuffer) && lastContent) if (hasContent)
{ {
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); headersFrame = new HeadersFrame(metaData, null, false);
boolean endStream = trailers == null || trailers.size() == 0; if (lastContent)
headersFrame = new HeadersFrame(metaData, null, endStream);
promise = new HeadersPromise(request, callback, stream ->
{ {
if (endStream) HttpFields trailers = retrieveTrailers(request);
callback.succeeded(); boolean hasTrailers = trailers != null;
else dataFrame = new DataFrame(contentBuffer, !hasTrailers);
sendTrailers(stream, trailers, callback); if (hasTrailers)
}); trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true);
}
else
{
dataFrame = new DataFrame(contentBuffer, false);
}
} }
else else
{ {
headersFrame = new HeadersFrame(metaData, null, false); if (lastContent)
promise = new HeadersPromise(request, callback, stream -> {
sendContent(stream, contentBuffer, lastContent, trailerSupplier, callback)); HttpFields trailers = retrieveTrailers(request);
boolean hasTrailers = trailers != null;
headersFrame = new HeadersFrame(metaData, null, !hasTrailers);
if (hasTrailers)
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true);
}
else
{
headersFrame = new HeadersFrame(metaData, null, false);
}
} }
} }
// TODO optimize the send of HEADERS and DATA frames.
HttpChannelOverHTTP2 channel = getHttpChannel(); HttpChannelOverHTTP2 channel = getHttpChannel();
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener()); IStream.FrameList frameList = new IStream.FrameList(headersFrame, dataFrame, trailersFrame);
((ISession)channel.getSession()).newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener());
} }
private String relativize(String path) private String relativize(String path)
@ -140,29 +155,30 @@ public class HttpSenderOverHTTP2 extends HttpSender
} }
} }
private HttpFields retrieveTrailers(HttpRequest request)
{
Supplier<HttpFields> trailerSupplier = request.getTrailers();
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
return trailers == null || trailers.size() == 0 ? null : trailers;
}
@Override @Override
protected void sendContent(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback) protected void sendContent(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback)
{ {
Stream stream = getHttpChannel().getStream(); Stream stream = getHttpChannel().getStream();
Supplier<HttpFields> trailerSupplier = exchange.getRequest().getTrailers(); boolean hasContent = contentBuffer.hasRemaining();
sendContent(stream, contentBuffer, lastContent, trailerSupplier, callback);
}
private void sendContent(Stream stream, ByteBuffer buffer, boolean lastContent, Supplier<HttpFields> trailerSupplier, Callback callback)
{
boolean hasContent = buffer.hasRemaining();
if (lastContent) if (lastContent)
{ {
// Call the trailers supplier as late as possible. // Call the trailers supplier as late as possible.
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); HttpFields trailers = retrieveTrailers(exchange.getRequest());
boolean hasTrailers = trailers != null && trailers.size() > 0; boolean hasTrailers = trailers != null && trailers.size() > 0;
if (hasContent) if (hasContent)
{ {
DataFrame dataFrame = new DataFrame(stream.getId(), buffer, !hasTrailers); DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, !hasTrailers);
Callback dataCallback = callback;
if (hasTrailers) if (hasTrailers)
dataCallback = Callback.from(() -> sendTrailers(stream, trailers, callback), callback::failed); stream.data(dataFrame, Callback.from(() -> sendTrailers(stream, trailers, callback), callback::failed));
stream.data(dataFrame, dataCallback); else
stream.data(dataFrame, callback);
} }
else else
{ {
@ -172,7 +188,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
} }
else else
{ {
DataFrame dataFrame = new DataFrame(stream.getId(), buffer, true); DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, true);
stream.data(dataFrame, callback); stream.data(dataFrame, callback);
} }
} }
@ -181,7 +197,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
{ {
if (hasContent) if (hasContent)
{ {
DataFrame dataFrame = new DataFrame(stream.getId(), buffer, false); DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, false);
stream.data(dataFrame, callback); stream.data(dataFrame, callback);
} }
else else
@ -203,13 +219,11 @@ public class HttpSenderOverHTTP2 extends HttpSender
{ {
private final HttpRequest request; private final HttpRequest request;
private final Callback callback; private final Callback callback;
private final Consumer<Stream> succeed;
private HeadersPromise(HttpRequest request, Callback callback, Consumer<Stream> succeed) private HeadersPromise(HttpRequest request, Callback callback)
{ {
this.request = request; this.request = request;
this.callback = callback; this.callback = callback;
this.succeed = succeed;
} }
@Override @Override
@ -218,7 +232,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
long idleTimeout = request.getIdleTimeout(); long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0) if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout); stream.setIdleTimeout(idleTimeout);
succeed.accept(stream); callback.succeeded();
} }
@Override @Override

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2.server; package org.eclipse.jetty.http2.server;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
@ -73,9 +74,9 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
} }
if (windowFrame == null) if (windowFrame == null)
frames(null, Callback.NOOP, settingsFrame, Frame.EMPTY_ARRAY); frames(null, List.of(settingsFrame), Callback.NOOP);
else else
frames(null, Callback.NOOP, settingsFrame, windowFrame); frames(null, List.of(settingsFrame, windowFrame), Callback.NOOP);
} }
@Override @Override

View File

@ -84,158 +84,172 @@ public class HttpTransportOverHTTP2 implements HttpTransport
} }
@Override @Override
public void send(MetaData.Request request, final MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback) public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
{ {
if (response != null)
sendHeaders(request, response, content, lastContent, callback);
else
sendContent(request, content, lastContent, callback);
}
public void sendHeaders(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
{
metaData = response;
HeadersFrame headersFrame;
DataFrame dataFrame = null;
HeadersFrame trailersFrame = null;
boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod()); boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
if (response != null) int status = response.getStatus();
boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102;
if (interimResponse)
{ {
metaData = response; // Must not commit interim responses.
int status = response.getStatus(); if (hasContent)
boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102;
if (interimResponse)
{ {
// Must not commit interim responses. callback.failed(new IllegalStateException("Interim response cannot have content"));
return;
}
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
}
else
{
if (commit.compareAndSet(false, true))
{
if (lastContent)
{
long realContentLength = BufferUtil.length(content);
long contentLength = response.getContentLength();
if (contentLength < 0)
{
metaData = new MetaData.Response(
response.getHttpVersion(),
response.getStatus(),
response.getReason(),
response.getFields(),
realContentLength,
response.getTrailerSupplier()
);
}
else if (hasContent && contentLength != realContentLength)
{
callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength)));
return;
}
}
if (hasContent) if (hasContent)
{ {
callback.failed(new IllegalStateException("Interim response cannot have content")); headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers == null)
{
dataFrame = new DataFrame(stream.getId(), content, true);
}
else
{
dataFrame = new DataFrame(stream.getId(), content, false);
trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true);
}
}
else
{
dataFrame = new DataFrame(stream.getId(), content, false);
}
} }
else else
{ {
transportCallback.send(callback, false, c -> if (lastContent)
sendHeadersFrame(metaData, false, c)); {
if (isTunnel(request, metaData))
{
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
}
else
{
HttpFields trailers = retrieveTrailers();
if (trailers == null)
{
headersFrame = new HeadersFrame(stream.getId(), metaData, null, true);
}
else
{
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true);
}
}
}
else
{
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
}
} }
} }
else else
{ {
if (commit.compareAndSet(false, true)) callback.failed(new IllegalStateException("committed"));
{ return;
if (lastContent) }
{ }
long realContentLength = BufferUtil.length(content);
long contentLength = response.getContentLength();
if (contentLength < 0)
{
metaData = new MetaData.Response(
response.getHttpVersion(),
response.getStatus(),
response.getReason(),
response.getFields(),
realContentLength,
response.getTrailerSupplier()
);
}
else if (hasContent && contentLength != realContentLength)
{
callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength)));
return;
}
}
if (hasContent) HeadersFrame hf = headersFrame;
{ DataFrame df = dataFrame;
Callback commitCallback = new Callback.Nested(callback) HeadersFrame tf = trailersFrame;
{
@Override transportCallback.send(callback, true, c ->
public void succeeded() {
{ if (LOG.isDebugEnabled())
if (lastContent) {
{ LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
HttpFields trailers = retrieveTrailers(); stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
if (trailers != null) System.lineSeparator(), HttpVersion.HTTP_2, metaData.getStatus(),
{ System.lineSeparator(), metaData.getFields());
transportCallback.send(new SendTrailers(getCallback(), trailers), false, c -> }
sendDataFrame(content, true, false, c)); stream.send(new IStream.FrameList(hf, df, tf), c);
} });
else }
{
transportCallback.send(getCallback(), false, c -> public void sendContent(MetaData.Request request, ByteBuffer content, boolean lastContent, Callback callback)
sendDataFrame(content, true, true, c)); {
} boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
} boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
else if (hasContent || (lastContent && !isTunnel(request, metaData)))
{ {
transportCallback.send(getCallback(), false, c -> if (lastContent)
sendDataFrame(content, false, false, c)); {
} HttpFields trailers = retrieveTrailers();
} if (trailers == null)
}; {
transportCallback.send(commitCallback, true, c -> transportCallback.send(callback, false, c ->
sendHeadersFrame(metaData, false, c)); sendDataFrame(content, true, true, c));
}
else
{
if (lastContent)
{
if (isTunnel(request, metaData))
{
transportCallback.send(callback, true, c ->
sendHeadersFrame(metaData, false, c));
}
else
{
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
transportCallback.send(new SendTrailers(callback, trailers), true, c ->
sendHeadersFrame(metaData, false, c));
}
else
{
transportCallback.send(callback, true, c ->
sendHeadersFrame(metaData, true, c));
}
}
}
else
{
transportCallback.send(callback, true, c ->
sendHeadersFrame(metaData, false, c));
}
}
} }
else else
{ {
callback.failed(new IllegalStateException("committed")); SendTrailers sendTrailers = new SendTrailers(callback, trailers);
if (hasContent)
{
transportCallback.send(sendTrailers, false, c ->
sendDataFrame(content, true, false, c));
}
else
{
sendTrailers.succeeded();
}
} }
} }
else
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, false, false, c));
}
} }
else else
{ {
if (hasContent || (lastContent && !isTunnel(request, metaData))) callback.succeeded();
{
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
if (hasContent)
{
transportCallback.send(sendTrailers, false, c ->
sendDataFrame(content, true, false, c));
}
else
{
sendTrailers.succeeded();
}
}
else
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, true, true, c));
}
}
else
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, false, false, c));
}
}
else
{
callback.succeeded();
}
} }
} }
@ -291,20 +305,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}, new Stream.Listener.Adapter()); // TODO: handle reset from the client ? }, new Stream.Listener.Adapter()); // TODO: handle reset from the client ?
} }
private void sendHeadersFrame(MetaData.Response info, boolean endStream, Callback callback)
{
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
System.lineSeparator(), info.getFields());
}
HeadersFrame frame = new HeadersFrame(stream.getId(), info, null, endStream);
stream.headers(frame, callback);
}
private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback) private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())