Merge branch 'jetty-10.0.x' of github.com:eclipse/jetty.project into jetty-10.0.x
This commit is contained in:
commit
a437558a48
|
@ -31,7 +31,6 @@ import java.nio.channels.SocketChannel;
|
|||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -563,7 +562,8 @@ public class HttpClient extends ContainerLifeCycle
|
|||
@Override
|
||||
public void succeeded(List<InetSocketAddress> socketAddresses)
|
||||
{
|
||||
Map<String, Object> context = new HashMap<>();
|
||||
// Multiple threads may access the map, especially with DEBUG logging enabled.
|
||||
Map<String, Object> context = new ConcurrentHashMap<>();
|
||||
context.put(ClientConnectionFactory.CLIENT_CONTEXT_KEY, HttpClient.this);
|
||||
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
|
||||
connect(socketAddresses, 0, context);
|
||||
|
|
|
@ -21,10 +21,10 @@ package org.eclipse.jetty.http2.client;
|
|||
import java.net.SocketAddress;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
|
||||
|
@ -391,7 +391,7 @@ public class HTTP2Client extends ContainerLifeCycle
|
|||
private Map<String, Object> contextFrom(ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
|
||||
{
|
||||
if (context == null)
|
||||
context = new HashMap<>();
|
||||
context = new ConcurrentHashMap<>();
|
||||
context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, this);
|
||||
context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listener);
|
||||
context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, promise);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.http2.client;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
|
@ -129,11 +130,11 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
if (windowDelta > 0)
|
||||
{
|
||||
session.updateRecvWindow(windowDelta);
|
||||
session.frames(null, this, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta));
|
||||
session.frames(null, List.of(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this);
|
||||
}
|
||||
else
|
||||
{
|
||||
session.frames(null, this, prefaceFrame, settingsFrame);
|
||||
session.frames(null, List.of(prefaceFrame, settingsFrame), this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.eclipse.jetty.http2.api.Session;
|
|||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.PrefaceFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
|
@ -198,14 +199,15 @@ public class StreamResetTest extends AbstractTest
|
|||
{
|
||||
// Simulate that there is pending data to send.
|
||||
IStream stream = (IStream)s;
|
||||
stream.getSession().frames(stream, new Callback()
|
||||
List<Frame> frames = List.of(new DataFrame(s.getId(), ByteBuffer.allocate(16), true));
|
||||
stream.getSession().frames(stream, frames, new Callback()
|
||||
{
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
serverResetLatch.countDown();
|
||||
}
|
||||
}, new DataFrame(s.getId(), ByteBuffer.allocate(16), true));
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -18,11 +18,11 @@
|
|||
|
||||
package org.eclipse.jetty.http2;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
|
||||
import org.eclipse.jetty.util.Atomics;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
@ -171,7 +171,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
|
|||
|
||||
protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
|
||||
{
|
||||
session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY);
|
||||
session.frames(stream, List.of(frame), Callback.NOOP);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -115,6 +115,25 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
return false;
|
||||
}
|
||||
|
||||
public boolean append(List<Entry> list)
|
||||
{
|
||||
Throwable closed;
|
||||
synchronized (this)
|
||||
{
|
||||
closed = terminated;
|
||||
if (closed == null)
|
||||
{
|
||||
list.forEach(entries::offer);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Appended {}, entries={}", list, entries.size());
|
||||
}
|
||||
}
|
||||
if (closed == null)
|
||||
return true;
|
||||
list.forEach(entry -> closed(entry, closed));
|
||||
return false;
|
||||
}
|
||||
|
||||
private int getWindowQueueSize()
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
|
@ -416,6 +435,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
|
||||
public abstract long onFlushed(long bytes) throws IOException;
|
||||
|
||||
boolean hasHighPriority()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.ArrayDeque;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -36,6 +37,7 @@ import java.util.concurrent.TimeoutException;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
|
@ -53,6 +55,7 @@ import org.eclipse.jetty.http2.frames.PriorityFrame;
|
|||
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.http2.frames.StreamFrame;
|
||||
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
|
||||
import org.eclipse.jetty.http2.generator.Generator;
|
||||
import org.eclipse.jetty.http2.hpack.HpackException;
|
||||
|
@ -602,7 +605,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
@Override
|
||||
public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
|
||||
{
|
||||
streamCreator.newStream(frame, promise, listener);
|
||||
newStream(new IStream.FrameList(frame), promise, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.Listener listener)
|
||||
{
|
||||
streamCreator.newStream(frames, promise, listener);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -617,11 +626,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
*/
|
||||
public IStream newLocalStream(HeadersFrame frameIn, HeadersFrame[] frameOut)
|
||||
{
|
||||
int streamId = frameIn.getStreamId();
|
||||
HeadersFrame frame = frameIn;
|
||||
int streamId = frame.getStreamId();
|
||||
if (streamId <= 0)
|
||||
{
|
||||
streamId = localStreamIds.getAndAdd(2);
|
||||
frame = frame.withStreamId(streamId);
|
||||
}
|
||||
|
||||
HeadersFrame frame = streamCreator.prepareHeadersFrame(streamId, frameIn);
|
||||
if (frameOut != null)
|
||||
frameOut[0] = frame;
|
||||
|
||||
|
@ -749,46 +761,48 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
|
||||
private void control(IStream stream, Callback callback, Frame frame)
|
||||
{
|
||||
frames(stream, callback, frame, Frame.EMPTY_ARRAY);
|
||||
frames(stream, List.of(frame), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void frames(IStream stream, Callback callback, Frame frame, Frame... frames)
|
||||
public void frames(IStream stream, List<? extends Frame> frames, Callback callback)
|
||||
{
|
||||
// We want to generate as late as possible to allow re-prioritization;
|
||||
// generation will happen while processing the entries.
|
||||
|
||||
// The callback needs to be notified only when the last frame completes.
|
||||
|
||||
int length = frames.length;
|
||||
if (length == 0)
|
||||
int count = frames.size();
|
||||
if (count > 1)
|
||||
callback = new CountingCallback(callback, count);
|
||||
for (int i = 1; i <= count; ++i)
|
||||
{
|
||||
frame(new ControlEntry(frame, stream, callback), true);
|
||||
}
|
||||
else
|
||||
{
|
||||
callback = new CountingCallback(callback, 1 + length);
|
||||
frame(new ControlEntry(frame, stream, callback), false);
|
||||
for (int i = 1; i <= length; ++i)
|
||||
{
|
||||
frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
|
||||
}
|
||||
Frame frame = frames.get(i - 1);
|
||||
HTTP2Flusher.Entry entry = newEntry(frame, stream, callback);
|
||||
frame(entry, i == count);
|
||||
}
|
||||
}
|
||||
|
||||
private HTTP2Flusher.Entry newEntry(Frame frame, IStream stream, Callback callback)
|
||||
{
|
||||
return frame.getType() == FrameType.DATA
|
||||
? new DataEntry((DataFrame)frame, stream, callback)
|
||||
: new ControlEntry(frame, stream, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void data(IStream stream, Callback callback, DataFrame frame)
|
||||
{
|
||||
// We want to generate as late as possible to allow re-prioritization.
|
||||
frame(new DataEntry(frame, stream, callback), true);
|
||||
frame(newEntry(frame, stream, callback), true);
|
||||
}
|
||||
|
||||
private void frame(HTTP2Flusher.Entry entry, boolean flush)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry.frame, this);
|
||||
LOG.debug("{} {} on {}", flush ? "Sending" : "Queueing", entry, this);
|
||||
// Ping frames are prepended to process them as soon as possible.
|
||||
boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
|
||||
boolean queued = entry.hasHighPriority() ? flusher.prepend(entry) : flusher.append(entry);
|
||||
if (queued && flush)
|
||||
{
|
||||
if (entry.stream != null)
|
||||
|
@ -1356,6 +1370,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean hasHighPriority()
|
||||
{
|
||||
return frame.getType() == FrameType.PING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
|
@ -1700,7 +1720,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
|
||||
private void complete()
|
||||
{
|
||||
frames(null, Callback.NOOP, newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame());
|
||||
frames(null, List.of(newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()), Callback.NOOP);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1760,26 +1780,40 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
int streamId = reserveSlot(slot, currentStreamId);
|
||||
|
||||
if (currentStreamId <= 0)
|
||||
frame = new PriorityFrame(streamId, frame.getParentStreamId(), frame.getWeight(), frame.isExclusive());
|
||||
frame = frame.withStreamId(streamId);
|
||||
|
||||
assignSlotAndFlush(slot, new ControlEntry(frame, null, callback));
|
||||
assignSlotAndFlush(slot, newEntry(frame, null, callback));
|
||||
return streamId;
|
||||
}
|
||||
|
||||
private void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
|
||||
private void newStream(IStream.FrameList frameList, Promise<Stream> promise, Stream.Listener listener)
|
||||
{
|
||||
Slot slot = new Slot();
|
||||
int currentStreamId = frame.getStreamId();
|
||||
int currentStreamId = frameList.getStreamId();
|
||||
int streamId = reserveSlot(slot, currentStreamId);
|
||||
|
||||
frame = prepareHeadersFrame(streamId, frame);
|
||||
List<StreamFrame> frames = frameList.getFrames();
|
||||
if (currentStreamId <= 0)
|
||||
{
|
||||
frames = frames.stream()
|
||||
.map(frame -> frame.withStreamId(streamId))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData());
|
||||
HeadersFrame headersFrame = (HeadersFrame)frameList.getFrames().get(0);
|
||||
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)headersFrame.getMetaData());
|
||||
stream.setListener(listener);
|
||||
stream.process(new PrefaceFrame(), Callback.NOOP);
|
||||
assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
|
||||
|
||||
int count = frames.size();
|
||||
Callback streamCallback = new StreamPromiseCallback(promise, stream);
|
||||
Callback callback = count == 1 ? streamCallback : new CountingCallback(streamCallback, count);
|
||||
List<HTTP2Flusher.Entry> entries = frames.stream()
|
||||
.map(frame -> newEntry(frame, stream, callback))
|
||||
.collect(Collectors.toList());
|
||||
assignSlotAndFlush(slot, entries);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
@ -1787,28 +1821,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
}
|
||||
}
|
||||
|
||||
private HeadersFrame prepareHeadersFrame(int streamId, HeadersFrame frame)
|
||||
{
|
||||
if (frame.getStreamId() <= 0)
|
||||
{
|
||||
PriorityFrame priority = frame.getPriority();
|
||||
priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), priority.getWeight(), priority.isExclusive());
|
||||
frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
|
||||
}
|
||||
return frame;
|
||||
}
|
||||
|
||||
private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener)
|
||||
{
|
||||
Slot slot = new Slot();
|
||||
int streamId = reserveSlot(slot, 0);
|
||||
frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
|
||||
frame = frame.withStreamId(streamId);
|
||||
|
||||
try
|
||||
{
|
||||
IStream stream = HTTP2Session.this.createLocalStream(streamId, frame.getMetaData());
|
||||
stream.setListener(listener);
|
||||
assignSlotAndFlush(slot, new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
|
||||
assignSlotAndFlush(slot, newEntry(frame, stream, new StreamPromiseCallback(promise, stream)));
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
@ -1816,13 +1839,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
}
|
||||
}
|
||||
|
||||
private void assignSlotAndFlush(Slot slot, ControlEntry entry)
|
||||
{
|
||||
// Every time a slot entry is assigned, we must flush.
|
||||
slot.entry = entry;
|
||||
flush();
|
||||
}
|
||||
|
||||
private int reserveSlot(Slot slot, int streamId)
|
||||
{
|
||||
if (streamId <= 0)
|
||||
|
@ -1843,6 +1859,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
return streamId;
|
||||
}
|
||||
|
||||
private void assignSlotAndFlush(Slot slot, HTTP2Flusher.Entry entry)
|
||||
{
|
||||
assignSlotAndFlush(slot, List.of(entry));
|
||||
}
|
||||
|
||||
private void assignSlotAndFlush(Slot slot, List<HTTP2Flusher.Entry> entries)
|
||||
{
|
||||
// Every time a slot entry is assigned, we must flush.
|
||||
slot.entries = entries;
|
||||
flush();
|
||||
}
|
||||
|
||||
private void releaseSlotFlushAndFail(Slot slot, Promise<Stream> promise, Throwable x)
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
|
@ -1854,16 +1882,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
}
|
||||
|
||||
/**
|
||||
* Flush goes over the entries of the slots queue to flush the entries,
|
||||
* until either one of the following two conditions is true:
|
||||
* - The queue is empty.
|
||||
* - It reaches a slot with a null entry.
|
||||
* When a slot with a null entry is encountered, this means a concurrent thread reserved a slot
|
||||
* but hasn't set its entry yet. Since entries must be flushed in order, the thread encountering
|
||||
* the null entry must bail out and it is up to the concurrent thread to finish up flushing.
|
||||
* Note that only one thread can flush at any one time, if two threads happen to call flush
|
||||
* concurrently, one will do the work while the other will bail out, so it is safe that all
|
||||
* threads call flush after they're done reserving a slot and setting the entry.
|
||||
* <p>Iterates over the entries of the slot queue to flush them.</p>
|
||||
* <p>The flush proceeds until either one of the following two conditions is true:</p>
|
||||
* <ul>
|
||||
* <li>the queue is empty</li>
|
||||
* <li>a slot with a no entries is encountered</li>
|
||||
* </ul>
|
||||
* <p>When a slot with a no entries is encountered, then it means that a concurrent thread reserved
|
||||
* a slot but hasn't set its entries yet. Since slots must be flushed in order, the thread encountering
|
||||
* the slot with no entries must bail out and it is up to the concurrent thread to finish up flushing.</p>
|
||||
* <p>Note that only one thread can flush at any time; if two threads happen to call this method
|
||||
* concurrently, one will do the work while the other will bail out, so it is safe that all
|
||||
* threads call this method after they are done reserving a slot and setting the entries.</p>
|
||||
*/
|
||||
private void flush()
|
||||
{
|
||||
|
@ -1871,7 +1901,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
boolean queued = false;
|
||||
while (true)
|
||||
{
|
||||
ControlEntry entry;
|
||||
List<HTTP2Flusher.Entry> entries;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
if (flushing == null)
|
||||
|
@ -1880,18 +1910,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
return; // Another thread is flushing.
|
||||
|
||||
Slot slot = slots.peek();
|
||||
entry = slot == null ? null : slot.entry;
|
||||
entries = slot == null ? null : slot.entries;
|
||||
|
||||
if (entry == null)
|
||||
if (entries == null)
|
||||
{
|
||||
flushing = null;
|
||||
// No more slots or null entry, so we may iterate on the flusher.
|
||||
// No more slots or null entries, so we may iterate on the flusher.
|
||||
break;
|
||||
}
|
||||
|
||||
slots.poll();
|
||||
}
|
||||
queued |= flusher.append(entry);
|
||||
queued |= flusher.append(entries);
|
||||
}
|
||||
if (queued)
|
||||
flusher.iterate();
|
||||
|
@ -1899,7 +1929,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
|
||||
private class Slot
|
||||
{
|
||||
private volatile ControlEntry entry;
|
||||
private volatile List<HTTP2Flusher.Entry> entries;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.EOFException;
|
|||
import java.io.IOException;
|
||||
import java.nio.channels.WritePendingException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -122,9 +123,15 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
|
||||
@Override
|
||||
public void headers(HeadersFrame frame, Callback callback)
|
||||
{
|
||||
send(new FrameList(frame), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(FrameList frameList, Callback callback)
|
||||
{
|
||||
if (startWrite(callback))
|
||||
session.frames(this, this, frame, Frame.EMPTY_ARRAY);
|
||||
session.frames(this, frameList.getFrames(), this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -150,7 +157,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
localReset = true;
|
||||
failure = new EOFException("reset");
|
||||
}
|
||||
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
|
||||
session.frames(this, List.of(frame), callback);
|
||||
}
|
||||
|
||||
private boolean startWrite(Callback callback)
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.http2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
|
@ -48,18 +49,25 @@ public interface ISession extends Session
|
|||
public void removeStream(IStream stream);
|
||||
|
||||
/**
|
||||
* <p>Enqueues the given frames to be written to the connection.</p>
|
||||
* <p>Sends the given list of frames to create a new {@link Stream}.</p>
|
||||
*
|
||||
* @param stream the stream the frames belong to
|
||||
* @param callback the callback that gets notified when the frames have been sent
|
||||
* @param frame the first frame to enqueue
|
||||
* @param frames additional frames to enqueue
|
||||
* @param frames the list of frames to send
|
||||
* @param promise the promise that gets notified of the stream creation
|
||||
* @param listener the listener that gets notified of stream events
|
||||
*/
|
||||
public void frames(IStream stream, Callback callback, Frame frame, Frame... frames);
|
||||
public void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.Listener listener);
|
||||
|
||||
/**
|
||||
* <p>Enqueues the given frames to be written to the connection.</p>
|
||||
* @param stream the stream the frames belong to
|
||||
* @param frames the frames to enqueue
|
||||
* @param callback the callback that gets notified when the frames have been sent
|
||||
*/
|
||||
public void frames(IStream stream, List<? extends Frame> frames, Callback callback);
|
||||
|
||||
/**
|
||||
* <p>Enqueues the given PUSH_PROMISE frame to be written to the connection.</p>
|
||||
* <p>Differently from {@link #frames(IStream, Callback, Frame, Frame...)}, this method
|
||||
* <p>Differently from {@link #frames(IStream, List, Callback)}, this method
|
||||
* generates atomically the stream id for the pushed stream.</p>
|
||||
*
|
||||
* @param stream the stream associated to the pushed stream
|
||||
|
|
|
@ -19,9 +19,16 @@
|
|||
package org.eclipse.jetty.http2;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.StreamFrame;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
||||
|
@ -52,6 +59,15 @@ public interface IStream extends Stream, Attachable, Closeable
|
|||
*/
|
||||
public void setListener(Listener listener);
|
||||
|
||||
/**
|
||||
* <p>Sends the given list of frames.</p>
|
||||
* <p>Typically used to send HTTP headers along with content and possibly trailers.</p>
|
||||
*
|
||||
* @param frameList the list of frames to send
|
||||
* @param callback the callback that gets notified when the frames have been sent
|
||||
*/
|
||||
void send(FrameList frameList, Callback callback);
|
||||
|
||||
/**
|
||||
* <p>Processes the given {@code frame}, belonging to this stream.</p>
|
||||
*
|
||||
|
@ -109,4 +125,63 @@ public interface IStream extends Stream, Attachable, Closeable
|
|||
* @see Listener#onFailure(Stream, int, String, Throwable, Callback)
|
||||
*/
|
||||
boolean isResetOrFailed();
|
||||
|
||||
/**
|
||||
* <p>An ordered list of frames belonging to the same stream.</p>
|
||||
*/
|
||||
public static class FrameList
|
||||
{
|
||||
private final List<StreamFrame> frames;
|
||||
|
||||
/**
|
||||
* <p>Creates a frame list of just the given HEADERS frame.</p>
|
||||
*
|
||||
* @param headers the HEADERS frame
|
||||
*/
|
||||
public FrameList(HeadersFrame headers)
|
||||
{
|
||||
Objects.requireNonNull(headers);
|
||||
this.frames = List.of(headers);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Creates a frame list of the given frames.</p>
|
||||
*
|
||||
* @param headers the HEADERS frame for the headers
|
||||
* @param data the DATA frame for the content, or null if there is no content
|
||||
* @param trailers the HEADERS frame for the trailers, or null if there are no trailers
|
||||
*/
|
||||
public FrameList(HeadersFrame headers, DataFrame data, HeadersFrame trailers)
|
||||
{
|
||||
Objects.requireNonNull(headers);
|
||||
ArrayList<StreamFrame> frames = new ArrayList<>(3);
|
||||
int streamId = headers.getStreamId();
|
||||
if (data != null && data.getStreamId() != streamId)
|
||||
throw new IllegalArgumentException("Invalid stream ID for DATA frame " + data);
|
||||
if (trailers != null && trailers.getStreamId() != streamId)
|
||||
throw new IllegalArgumentException("Invalid stream ID for HEADERS frame " + trailers);
|
||||
frames.add(headers);
|
||||
if (data != null)
|
||||
frames.add(data);
|
||||
if (trailers != null)
|
||||
frames.add(trailers);
|
||||
this.frames = Collections.unmodifiableList(frames);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the stream ID of the frames in this list
|
||||
*/
|
||||
public int getStreamId()
|
||||
{
|
||||
return frames.get(0).getStreamId();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a List of non-null frames
|
||||
*/
|
||||
public List<StreamFrame> getFrames()
|
||||
{
|
||||
return frames;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
|
||||
package org.eclipse.jetty.http2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
@ -44,12 +47,13 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
|
|||
// This method is called when a whole flow controlled frame has been consumed.
|
||||
// We send a WindowUpdate every time, even if the frame was very small.
|
||||
|
||||
final WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length);
|
||||
List<Frame> frames = new ArrayList<>(2);
|
||||
WindowUpdateFrame sessionFrame = new WindowUpdateFrame(0, length);
|
||||
frames.add(sessionFrame);
|
||||
session.updateRecvWindow(length);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data consumed, increased session recv window by {} for {}", length, session);
|
||||
|
||||
Frame[] streamFrame = Frame.EMPTY_ARRAY;
|
||||
if (stream != null)
|
||||
{
|
||||
if (stream.isRemotelyClosed())
|
||||
|
@ -59,14 +63,14 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
|
|||
}
|
||||
else
|
||||
{
|
||||
streamFrame = new Frame[1];
|
||||
streamFrame[0] = new WindowUpdateFrame(stream.getId(), length);
|
||||
WindowUpdateFrame streamFrame = new WindowUpdateFrame(stream.getId(), length);
|
||||
frames.add(streamFrame);
|
||||
stream.updateRecvWindow(length);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Data consumed, increased stream recv window by {} for {}", length, stream);
|
||||
}
|
||||
}
|
||||
|
||||
session.frames(stream, Callback.NOOP, sessionFrame, streamFrame);
|
||||
session.frames(stream, frames, Callback.NOOP);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,7 +66,8 @@ public interface Stream
|
|||
}
|
||||
|
||||
/**
|
||||
* <p>Sends the given HEADERS {@code frame} representing an HTTP response.</p>
|
||||
* <p>Sends the given HEADERS {@code frame}.</p>
|
||||
* <p>Typically used to send an HTTP response or to send the HTTP response trailers.</p>
|
||||
*
|
||||
* @param frame the HEADERS frame to send
|
||||
* @param callback the callback that gets notified when the frame has been sent
|
||||
|
|
|
@ -20,13 +20,17 @@ package org.eclipse.jetty.http2.frames;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class DataFrame extends Frame
|
||||
public class DataFrame extends StreamFrame
|
||||
{
|
||||
private final int streamId;
|
||||
private final ByteBuffer data;
|
||||
private final boolean endStream;
|
||||
private final int padding;
|
||||
|
||||
public DataFrame(ByteBuffer data, boolean endStream)
|
||||
{
|
||||
this(0, data, endStream);
|
||||
}
|
||||
|
||||
public DataFrame(int streamId, ByteBuffer data, boolean endStream)
|
||||
{
|
||||
this(streamId, data, endStream, 0);
|
||||
|
@ -34,18 +38,12 @@ public class DataFrame extends Frame
|
|||
|
||||
public DataFrame(int streamId, ByteBuffer data, boolean endStream, int padding)
|
||||
{
|
||||
super(FrameType.DATA);
|
||||
this.streamId = streamId;
|
||||
super(FrameType.DATA, streamId);
|
||||
this.data = data;
|
||||
this.endStream = endStream;
|
||||
this.padding = padding;
|
||||
}
|
||||
|
||||
public int getStreamId()
|
||||
{
|
||||
return streamId;
|
||||
}
|
||||
|
||||
public ByteBuffer getData()
|
||||
{
|
||||
return data;
|
||||
|
@ -72,9 +70,15 @@ public class DataFrame extends Frame
|
|||
return padding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataFrame withStreamId(int streamId)
|
||||
{
|
||||
return new DataFrame(streamId, getData(), isEndStream());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s#%d{length:%d,end=%b}", super.toString(), streamId, data.remaining(), endStream);
|
||||
return String.format("%s#%d{length:%d,end=%b}", super.toString(), getStreamId(), data.remaining(), endStream);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames;
|
|||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
|
||||
public class HeadersFrame extends Frame
|
||||
public class HeadersFrame extends StreamFrame
|
||||
{
|
||||
private final int streamId;
|
||||
private final MetaData metaData;
|
||||
private final PriorityFrame priority;
|
||||
private final boolean endStream;
|
||||
|
@ -53,18 +52,12 @@ public class HeadersFrame extends Frame
|
|||
*/
|
||||
public HeadersFrame(int streamId, MetaData metaData, PriorityFrame priority, boolean endStream)
|
||||
{
|
||||
super(FrameType.HEADERS);
|
||||
this.streamId = streamId;
|
||||
super(FrameType.HEADERS, streamId);
|
||||
this.metaData = metaData;
|
||||
this.priority = priority;
|
||||
this.endStream = endStream;
|
||||
}
|
||||
|
||||
public int getStreamId()
|
||||
{
|
||||
return streamId;
|
||||
}
|
||||
|
||||
public MetaData getMetaData()
|
||||
{
|
||||
return metaData;
|
||||
|
@ -80,10 +73,18 @@ public class HeadersFrame extends Frame
|
|||
return endStream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HeadersFrame withStreamId(int streamId)
|
||||
{
|
||||
PriorityFrame priority = getPriority();
|
||||
priority = priority == null ? null : priority.withStreamId(streamId);
|
||||
return new HeadersFrame(streamId, getMetaData(), priority, isEndStream());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s#%d{end=%b}%s", super.toString(), streamId, endStream,
|
||||
return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream,
|
||||
priority == null ? "" : String.format("+%s", priority));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,11 +18,10 @@
|
|||
|
||||
package org.eclipse.jetty.http2.frames;
|
||||
|
||||
public class PriorityFrame extends Frame
|
||||
public class PriorityFrame extends StreamFrame
|
||||
{
|
||||
public static final int PRIORITY_LENGTH = 5;
|
||||
|
||||
private final int streamId;
|
||||
private final int parentStreamId;
|
||||
private final int weight;
|
||||
private final boolean exclusive;
|
||||
|
@ -34,18 +33,12 @@ public class PriorityFrame extends Frame
|
|||
|
||||
public PriorityFrame(int streamId, int parentStreamId, int weight, boolean exclusive)
|
||||
{
|
||||
super(FrameType.PRIORITY);
|
||||
this.streamId = streamId;
|
||||
super(FrameType.PRIORITY, streamId);
|
||||
this.parentStreamId = parentStreamId;
|
||||
this.weight = weight;
|
||||
this.exclusive = exclusive;
|
||||
}
|
||||
|
||||
public int getStreamId()
|
||||
{
|
||||
return streamId;
|
||||
}
|
||||
|
||||
public int getParentStreamId()
|
||||
{
|
||||
return parentStreamId;
|
||||
|
@ -61,9 +54,15 @@ public class PriorityFrame extends Frame
|
|||
return exclusive;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PriorityFrame withStreamId(int streamId)
|
||||
{
|
||||
return new PriorityFrame(streamId, getParentStreamId(), getWeight(), isExclusive());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), streamId, parentStreamId, weight, exclusive);
|
||||
return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), getStreamId(), parentStreamId, weight, exclusive);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,9 +20,8 @@ package org.eclipse.jetty.http2.frames;
|
|||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
|
||||
public class PushPromiseFrame extends Frame
|
||||
public class PushPromiseFrame extends StreamFrame
|
||||
{
|
||||
private final int streamId;
|
||||
private final int promisedStreamId;
|
||||
private final MetaData.Request metaData;
|
||||
|
||||
|
@ -33,17 +32,11 @@ public class PushPromiseFrame extends Frame
|
|||
|
||||
public PushPromiseFrame(int streamId, int promisedStreamId, MetaData.Request metaData)
|
||||
{
|
||||
super(FrameType.PUSH_PROMISE);
|
||||
this.streamId = streamId;
|
||||
super(FrameType.PUSH_PROMISE, streamId);
|
||||
this.promisedStreamId = promisedStreamId;
|
||||
this.metaData = metaData;
|
||||
}
|
||||
|
||||
public int getStreamId()
|
||||
{
|
||||
return streamId;
|
||||
}
|
||||
|
||||
public int getPromisedStreamId()
|
||||
{
|
||||
return promisedStreamId;
|
||||
|
@ -54,9 +47,15 @@ public class PushPromiseFrame extends Frame
|
|||
return metaData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PushPromiseFrame withStreamId(int streamId)
|
||||
{
|
||||
return new PushPromiseFrame(getStreamId(), streamId, getMetaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s#%d/#%d", super.toString(), streamId, promisedStreamId);
|
||||
return String.format("%s#%d/#%d", super.toString(), getStreamId(), promisedStreamId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under
|
||||
// the terms of the Eclipse Public License 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// This Source Code may also be made available under the following
|
||||
// Secondary Licenses when the conditions for such availability set
|
||||
// forth in the Eclipse Public License, v. 2.0 are satisfied:
|
||||
// the Apache License v2.0 which is available at
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http2.frames;
|
||||
|
||||
public abstract class StreamFrame extends Frame
|
||||
{
|
||||
private final int streamId;
|
||||
|
||||
public StreamFrame(FrameType type, int streamId)
|
||||
{
|
||||
super(type);
|
||||
this.streamId = streamId;
|
||||
}
|
||||
|
||||
public int getStreamId()
|
||||
{
|
||||
return streamId;
|
||||
}
|
||||
|
||||
public abstract StreamFrame withStreamId(int streamId);
|
||||
}
|
|
@ -20,7 +20,6 @@ package org.eclipse.jetty.http2.client.http;
|
|||
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.eclipse.jetty.client.HttpExchange;
|
||||
|
@ -32,6 +31,8 @@ import org.eclipse.jetty.http.HttpMethod;
|
|||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
import org.eclipse.jetty.http2.IStream;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
|
@ -57,7 +58,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, final Callback callback)
|
||||
protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback)
|
||||
{
|
||||
HttpRequest request = exchange.getRequest();
|
||||
boolean isTunnel = HttpMethod.CONNECT.is(request.getMethod());
|
||||
|
@ -88,38 +89,52 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
}
|
||||
|
||||
HeadersFrame headersFrame;
|
||||
Promise<Stream> promise;
|
||||
DataFrame dataFrame = null;
|
||||
HeadersFrame trailersFrame = null;
|
||||
|
||||
if (isTunnel)
|
||||
{
|
||||
headersFrame = new HeadersFrame(metaData, null, false);
|
||||
promise = new HeadersPromise(request, callback, stream -> callback.succeeded());
|
||||
}
|
||||
else
|
||||
{
|
||||
Supplier<HttpFields> trailerSupplier = request.getTrailers();
|
||||
if (BufferUtil.isEmpty(contentBuffer) && lastContent)
|
||||
boolean hasContent = BufferUtil.hasContent(contentBuffer);
|
||||
if (hasContent)
|
||||
{
|
||||
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
|
||||
boolean endStream = trailers == null || trailers.size() == 0;
|
||||
headersFrame = new HeadersFrame(metaData, null, endStream);
|
||||
promise = new HeadersPromise(request, callback, stream ->
|
||||
headersFrame = new HeadersFrame(metaData, null, false);
|
||||
if (lastContent)
|
||||
{
|
||||
if (endStream)
|
||||
callback.succeeded();
|
||||
else
|
||||
sendTrailers(stream, trailers, callback);
|
||||
});
|
||||
HttpFields trailers = retrieveTrailers(request);
|
||||
boolean hasTrailers = trailers != null;
|
||||
dataFrame = new DataFrame(contentBuffer, !hasTrailers);
|
||||
if (hasTrailers)
|
||||
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
dataFrame = new DataFrame(contentBuffer, false);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
headersFrame = new HeadersFrame(metaData, null, false);
|
||||
promise = new HeadersPromise(request, callback, stream ->
|
||||
sendContent(stream, contentBuffer, lastContent, trailerSupplier, callback));
|
||||
if (lastContent)
|
||||
{
|
||||
HttpFields trailers = retrieveTrailers(request);
|
||||
boolean hasTrailers = trailers != null;
|
||||
headersFrame = new HeadersFrame(metaData, null, !hasTrailers);
|
||||
if (hasTrailers)
|
||||
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), null, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
headersFrame = new HeadersFrame(metaData, null, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO optimize the send of HEADERS and DATA frames.
|
||||
|
||||
HttpChannelOverHTTP2 channel = getHttpChannel();
|
||||
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
|
||||
IStream.FrameList frameList = new IStream.FrameList(headersFrame, dataFrame, trailersFrame);
|
||||
((ISession)channel.getSession()).newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener());
|
||||
}
|
||||
|
||||
private String relativize(String path)
|
||||
|
@ -140,29 +155,30 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
}
|
||||
}
|
||||
|
||||
private HttpFields retrieveTrailers(HttpRequest request)
|
||||
{
|
||||
Supplier<HttpFields> trailerSupplier = request.getTrailers();
|
||||
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
|
||||
return trailers == null || trailers.size() == 0 ? null : trailers;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendContent(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback)
|
||||
{
|
||||
Stream stream = getHttpChannel().getStream();
|
||||
Supplier<HttpFields> trailerSupplier = exchange.getRequest().getTrailers();
|
||||
sendContent(stream, contentBuffer, lastContent, trailerSupplier, callback);
|
||||
}
|
||||
|
||||
private void sendContent(Stream stream, ByteBuffer buffer, boolean lastContent, Supplier<HttpFields> trailerSupplier, Callback callback)
|
||||
{
|
||||
boolean hasContent = buffer.hasRemaining();
|
||||
boolean hasContent = contentBuffer.hasRemaining();
|
||||
if (lastContent)
|
||||
{
|
||||
// Call the trailers supplier as late as possible.
|
||||
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
|
||||
HttpFields trailers = retrieveTrailers(exchange.getRequest());
|
||||
boolean hasTrailers = trailers != null && trailers.size() > 0;
|
||||
if (hasContent)
|
||||
{
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), buffer, !hasTrailers);
|
||||
Callback dataCallback = callback;
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, !hasTrailers);
|
||||
if (hasTrailers)
|
||||
dataCallback = Callback.from(() -> sendTrailers(stream, trailers, callback), callback::failed);
|
||||
stream.data(dataFrame, dataCallback);
|
||||
stream.data(dataFrame, Callback.from(() -> sendTrailers(stream, trailers, callback), callback::failed));
|
||||
else
|
||||
stream.data(dataFrame, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -172,7 +188,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
}
|
||||
else
|
||||
{
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), buffer, true);
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, true);
|
||||
stream.data(dataFrame, callback);
|
||||
}
|
||||
}
|
||||
|
@ -181,7 +197,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
{
|
||||
if (hasContent)
|
||||
{
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), buffer, false);
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), contentBuffer, false);
|
||||
stream.data(dataFrame, callback);
|
||||
}
|
||||
else
|
||||
|
@ -203,13 +219,11 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
{
|
||||
private final HttpRequest request;
|
||||
private final Callback callback;
|
||||
private final Consumer<Stream> succeed;
|
||||
|
||||
private HeadersPromise(HttpRequest request, Callback callback, Consumer<Stream> succeed)
|
||||
private HeadersPromise(HttpRequest request, Callback callback)
|
||||
{
|
||||
this.request = request;
|
||||
this.callback = callback;
|
||||
this.succeed = succeed;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -218,7 +232,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
long idleTimeout = request.getIdleTimeout();
|
||||
if (idleTimeout >= 0)
|
||||
stream.setIdleTimeout(idleTimeout);
|
||||
succeed.accept(stream);
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.http2.server;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
|
@ -73,9 +74,9 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
|
|||
}
|
||||
|
||||
if (windowFrame == null)
|
||||
frames(null, Callback.NOOP, settingsFrame, Frame.EMPTY_ARRAY);
|
||||
frames(null, List.of(settingsFrame), Callback.NOOP);
|
||||
else
|
||||
frames(null, Callback.NOOP, settingsFrame, windowFrame);
|
||||
frames(null, List.of(settingsFrame, windowFrame), Callback.NOOP);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -84,158 +84,172 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
}
|
||||
|
||||
@Override
|
||||
public void send(MetaData.Request request, final MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
|
||||
public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
|
||||
{
|
||||
if (response != null)
|
||||
sendHeaders(request, response, content, lastContent, callback);
|
||||
else
|
||||
sendContent(request, content, lastContent, callback);
|
||||
}
|
||||
|
||||
public void sendHeaders(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
|
||||
{
|
||||
metaData = response;
|
||||
|
||||
HeadersFrame headersFrame;
|
||||
DataFrame dataFrame = null;
|
||||
HeadersFrame trailersFrame = null;
|
||||
|
||||
boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
|
||||
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
|
||||
if (response != null)
|
||||
int status = response.getStatus();
|
||||
boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102;
|
||||
if (interimResponse)
|
||||
{
|
||||
metaData = response;
|
||||
int status = response.getStatus();
|
||||
boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102;
|
||||
if (interimResponse)
|
||||
// Must not commit interim responses.
|
||||
if (hasContent)
|
||||
{
|
||||
// Must not commit interim responses.
|
||||
callback.failed(new IllegalStateException("Interim response cannot have content"));
|
||||
return;
|
||||
}
|
||||
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (commit.compareAndSet(false, true))
|
||||
{
|
||||
if (lastContent)
|
||||
{
|
||||
long realContentLength = BufferUtil.length(content);
|
||||
long contentLength = response.getContentLength();
|
||||
if (contentLength < 0)
|
||||
{
|
||||
metaData = new MetaData.Response(
|
||||
response.getHttpVersion(),
|
||||
response.getStatus(),
|
||||
response.getReason(),
|
||||
response.getFields(),
|
||||
realContentLength,
|
||||
response.getTrailerSupplier()
|
||||
);
|
||||
}
|
||||
else if (hasContent && contentLength != realContentLength)
|
||||
{
|
||||
callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasContent)
|
||||
{
|
||||
callback.failed(new IllegalStateException("Interim response cannot have content"));
|
||||
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||
if (lastContent)
|
||||
{
|
||||
HttpFields trailers = retrieveTrailers();
|
||||
if (trailers == null)
|
||||
{
|
||||
dataFrame = new DataFrame(stream.getId(), content, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
dataFrame = new DataFrame(stream.getId(), content, false);
|
||||
trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
dataFrame = new DataFrame(stream.getId(), content, false);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
transportCallback.send(callback, false, c ->
|
||||
sendHeadersFrame(metaData, false, c));
|
||||
if (lastContent)
|
||||
{
|
||||
if (isTunnel(request, metaData))
|
||||
{
|
||||
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
HttpFields trailers = retrieveTrailers();
|
||||
if (trailers == null)
|
||||
{
|
||||
headersFrame = new HeadersFrame(stream.getId(), metaData, null, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||
trailersFrame = new HeadersFrame(stream.getId(), new MetaData(HttpVersion.HTTP_2, trailers), null, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
headersFrame = new HeadersFrame(stream.getId(), metaData, null, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (commit.compareAndSet(false, true))
|
||||
{
|
||||
if (lastContent)
|
||||
{
|
||||
long realContentLength = BufferUtil.length(content);
|
||||
long contentLength = response.getContentLength();
|
||||
if (contentLength < 0)
|
||||
{
|
||||
metaData = new MetaData.Response(
|
||||
response.getHttpVersion(),
|
||||
response.getStatus(),
|
||||
response.getReason(),
|
||||
response.getFields(),
|
||||
realContentLength,
|
||||
response.getTrailerSupplier()
|
||||
);
|
||||
}
|
||||
else if (hasContent && contentLength != realContentLength)
|
||||
{
|
||||
callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
callback.failed(new IllegalStateException("committed"));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasContent)
|
||||
{
|
||||
Callback commitCallback = new Callback.Nested(callback)
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
if (lastContent)
|
||||
{
|
||||
HttpFields trailers = retrieveTrailers();
|
||||
if (trailers != null)
|
||||
{
|
||||
transportCallback.send(new SendTrailers(getCallback(), trailers), false, c ->
|
||||
sendDataFrame(content, true, false, c));
|
||||
}
|
||||
else
|
||||
{
|
||||
transportCallback.send(getCallback(), false, c ->
|
||||
sendDataFrame(content, true, true, c));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
transportCallback.send(getCallback(), false, c ->
|
||||
sendDataFrame(content, false, false, c));
|
||||
}
|
||||
}
|
||||
};
|
||||
transportCallback.send(commitCallback, true, c ->
|
||||
sendHeadersFrame(metaData, false, c));
|
||||
}
|
||||
else
|
||||
{
|
||||
if (lastContent)
|
||||
{
|
||||
if (isTunnel(request, metaData))
|
||||
{
|
||||
transportCallback.send(callback, true, c ->
|
||||
sendHeadersFrame(metaData, false, c));
|
||||
}
|
||||
else
|
||||
{
|
||||
HttpFields trailers = retrieveTrailers();
|
||||
if (trailers != null)
|
||||
{
|
||||
transportCallback.send(new SendTrailers(callback, trailers), true, c ->
|
||||
sendHeadersFrame(metaData, false, c));
|
||||
}
|
||||
else
|
||||
{
|
||||
transportCallback.send(callback, true, c ->
|
||||
sendHeadersFrame(metaData, true, c));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
transportCallback.send(callback, true, c ->
|
||||
sendHeadersFrame(metaData, false, c));
|
||||
}
|
||||
}
|
||||
HeadersFrame hf = headersFrame;
|
||||
DataFrame df = dataFrame;
|
||||
HeadersFrame tf = trailersFrame;
|
||||
|
||||
transportCallback.send(callback, true, c ->
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
|
||||
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
|
||||
System.lineSeparator(), HttpVersion.HTTP_2, metaData.getStatus(),
|
||||
System.lineSeparator(), metaData.getFields());
|
||||
}
|
||||
stream.send(new IStream.FrameList(hf, df, tf), c);
|
||||
});
|
||||
}
|
||||
|
||||
public void sendContent(MetaData.Request request, ByteBuffer content, boolean lastContent, Callback callback)
|
||||
{
|
||||
boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
|
||||
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
|
||||
if (hasContent || (lastContent && !isTunnel(request, metaData)))
|
||||
{
|
||||
if (lastContent)
|
||||
{
|
||||
HttpFields trailers = retrieveTrailers();
|
||||
if (trailers == null)
|
||||
{
|
||||
transportCallback.send(callback, false, c ->
|
||||
sendDataFrame(content, true, true, c));
|
||||
}
|
||||
else
|
||||
{
|
||||
callback.failed(new IllegalStateException("committed"));
|
||||
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
|
||||
if (hasContent)
|
||||
{
|
||||
transportCallback.send(sendTrailers, false, c ->
|
||||
sendDataFrame(content, true, false, c));
|
||||
}
|
||||
else
|
||||
{
|
||||
sendTrailers.succeeded();
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
transportCallback.send(callback, false, c ->
|
||||
sendDataFrame(content, false, false, c));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (hasContent || (lastContent && !isTunnel(request, metaData)))
|
||||
{
|
||||
if (lastContent)
|
||||
{
|
||||
HttpFields trailers = retrieveTrailers();
|
||||
if (trailers != null)
|
||||
{
|
||||
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
|
||||
if (hasContent)
|
||||
{
|
||||
transportCallback.send(sendTrailers, false, c ->
|
||||
sendDataFrame(content, true, false, c));
|
||||
}
|
||||
else
|
||||
{
|
||||
sendTrailers.succeeded();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
transportCallback.send(callback, false, c ->
|
||||
sendDataFrame(content, true, true, c));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
transportCallback.send(callback, false, c ->
|
||||
sendDataFrame(content, false, false, c));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,20 +305,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
}, new Stream.Listener.Adapter()); // TODO: handle reset from the client ?
|
||||
}
|
||||
|
||||
private void sendHeadersFrame(MetaData.Response info, boolean endStream, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
|
||||
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
|
||||
System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
|
||||
System.lineSeparator(), info.getFields());
|
||||
}
|
||||
|
||||
HeadersFrame frame = new HeadersFrame(stream.getId(), info, null, endStream);
|
||||
stream.headers(frame, callback);
|
||||
}
|
||||
|
||||
private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
|
|
Loading…
Reference in New Issue