diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java index 18ce2b3537e..10a2a0a2380 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java @@ -77,7 +77,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable } @Override - public Session getSession() + public HTTP3Session getSession() { return session; } @@ -356,6 +356,11 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable return completable; } + public boolean isClosed() + { + return closeState == CloseState.CLOSED; + } + void updateClose(boolean update, boolean local) { if (update) diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java index 8c16c4b12eb..97e49047de4 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/HTTP3ServerConnectionFactory.java @@ -42,12 +42,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF { HTTP3Stream http3Stream = (HTTP3Stream)stream; HTTP3StreamListener listener = new HTTP3StreamListener(http3Stream.getEndPoint()); - Runnable runnable = listener.onRequest(stream, frame); - if (runnable != null) - { - ServerHTTP3Session protocolSession = (ServerHTTP3Session)((HTTP3Session)http3Stream.getSession()).getProtocolSession(); - protocolSession.offer(runnable); - } + listener.onRequest(stream, frame); return listener; } } @@ -66,21 +61,51 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF return (ServerHTTP3StreamConnection)endPoint.getConnection(); } - public Runnable onRequest(Stream stream, HeadersFrame frame) + public void onRequest(Stream stream, HeadersFrame frame) { - return getConnection().onRequest((HTTP3Stream)stream, frame); - } - - @Override - public void onTrailer(Stream stream, HeadersFrame frame) - { - getConnection().onTrailer((HTTP3Stream)stream, frame); + HTTP3Stream http3Stream = (HTTP3Stream)stream; + Runnable runnable = getConnection().onRequest(http3Stream, frame); + if (runnable != null) + { + ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); + protocolSession.offer(runnable); + } } @Override public void onDataAvailable(Stream stream) { - getConnection().onDataAvailable((HTTP3Stream)stream); + HTTP3Stream http3Stream = (HTTP3Stream)stream; + Runnable runnable = getConnection().onDataAvailable(http3Stream); + if (runnable != null) + { + ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); + protocolSession.offer(runnable); + } + } + + @Override + public void onTrailer(Stream stream, HeadersFrame frame) + { + HTTP3Stream http3Stream = (HTTP3Stream)stream; + Runnable runnable = getConnection().onTrailer(http3Stream, frame); + if (runnable != null) + { + ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession(); + protocolSession.offer(runnable); + } + } + + @Override + public boolean onIdleTimeout(Stream stream, Throwable failure) + { + return getConnection().onIdleTimeout((HTTP3Stream)stream, failure); + } + + @Override + public void onFailure(Stream stream, Throwable failure) + { + getConnection().onFailure((HTTP3Stream)stream, failure); } } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java index afdb29e4d07..87816850140 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java @@ -13,7 +13,20 @@ package org.eclipse.jetty.http3.server.internal; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +import org.eclipse.jetty.http.BadMessageException; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http.PreEncodedHttpField; +import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.frames.DataFrame; +import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.internal.HTTP3Stream; import org.eclipse.jetty.http3.internal.parser.MessageParser; import org.eclipse.jetty.io.EndPoint; @@ -22,12 +35,22 @@ import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpInput; import org.eclipse.jetty.server.HttpTransport; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HttpChannelOverHTTP3 extends HttpChannel { + private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP3.class); + + private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); + private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION); + private final HTTP3Stream stream; private final ServerHTTP3StreamConnection connection; private HttpInput.Content content; + private boolean expect100Continue; + private boolean delayedUntilContent; public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HTTP3Stream stream, ServerHTTP3StreamConnection connection) { @@ -36,6 +59,207 @@ public class HttpChannelOverHTTP3 extends HttpChannel this.connection = connection; } + void consumeInput() + { + getRequest().getHttpInput().consumeAll(); + } + + public Runnable onRequest(HeadersFrame frame) + { + try + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + HttpFields fields = request.getFields(); + + expect100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()); + + HttpFields.Mutable response = getResponse().getHttpFields(); + if (getHttpConfiguration().getSendServerVersion()) + response.add(SERVER_VERSION); + if (getHttpConfiguration().getSendXPoweredBy()) + response.add(POWERED_BY); + + onRequest(request); + + boolean endStream = frame.isLast(); + if (endStream) + { + onContentComplete(); + onRequestComplete(); + } + + boolean connect = request instanceof MetaData.ConnectRequest; + delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() && + !endStream && !expect100Continue && !connect; + + // Delay the demand of DATA frames for CONNECT with :protocol + // or for normal requests expecting 100 continue. + if (connect) + { + if (request.getProtocol() == null) + stream.demand(); + } + else + { + if (delayedUntilContent) + stream.demand(); + } + + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP3 Request #{}/{}, delayed={}:{}{} {} {}{}{}", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + delayedUntilContent, System.lineSeparator(), + request.getMethod(), request.getURI(), request.getHttpVersion(), + System.lineSeparator(), fields); + } + + return delayedUntilContent ? null : this; + } + catch (BadMessageException x) + { + if (LOG.isDebugEnabled()) + LOG.debug("onRequest", x); + onBadMessage(x); + return null; + } + catch (Throwable x) + { + onBadMessage(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, null, x)); + return null; + } + } + + public Runnable onDataAvailable() + { + Stream.Data data = stream.readData(); + if (data == null) + { + stream.demand(); + return null; + } + + ByteBuffer buffer = data.getByteBuffer(); + int length = buffer.remaining(); + HttpInput.Content content = new HttpInput.Content(buffer) + { + @Override + public boolean isEof() + { + return data.isLast(); + } + + @Override + public void succeeded() + { + data.complete(); + } + + @Override + public void failed(Throwable x) + { + data.complete(); + } + }; + + this.content = content; + boolean handle = onContent(content); + + boolean isLast = data.isLast(); + if (isLast) + { + boolean handleContent = onContentComplete(); + // This will generate EOF -> must happen before onContentProducible. + boolean handleRequest = onRequestComplete(); + handle |= handleContent | handleRequest; + } + + boolean woken = getRequest().getHttpInput().onContentProducible(); + handle |= woken; + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP3 Request #{}/{}: {} bytes of {} content, woken: {}, handle: {}", + stream.getId(), + Integer.toHexString(stream.getSession().hashCode()), + length, + isLast ? "last" : "some", + woken, + handle); + } + + boolean wasDelayed = delayedUntilContent; + delayedUntilContent = false; + return handle || wasDelayed ? this : null; + } + + public Runnable onTrailer(HeadersFrame frame) + { + HttpFields trailers = frame.getMetaData().getFields(); + if (trailers.size() > 0) + onTrailers(trailers); + + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP3 Request #{}/{}, trailers:{}{}", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + System.lineSeparator(), trailers); + } + + // This will generate EOF -> need to call onContentProducible. + boolean handle = onRequestComplete(); + boolean woken = getRequest().getHttpInput().onContentProducible(); + handle |= woken; + + boolean wasDelayed = delayedUntilContent; + delayedUntilContent = false; + return handle || wasDelayed ? this : null; + } + + public boolean onIdleTimeout(Throwable failure, Consumer consumer) + { + boolean delayed = delayedUntilContent; + delayedUntilContent = false; + + boolean reset = getState().isIdle(); + if (reset) + consumeInput(); + + //TODO +// getHttpTransport().onStreamTimeout(failure); + + failure.addSuppressed(new Throwable("HttpInput idle timeout")); + // TODO: writing to the content field here is at race with demand? + if (content == null) + content = new HttpInput.ErrorContent(failure); + boolean needed = getRequest().getHttpInput().onContentProducible(); + + if (needed || delayed) + { + consumer.accept(this::handleWithContext); + reset = false; + } + + return reset; + } + + private void handleWithContext() + { + ContextHandler context = getState().getContextHandler(); + if (context != null) + context.handle(getRequest(), this); + else + handle(); + } + + public void onFailure(Throwable failure) + { + //TODO +// getHttpTransport().onStreamFailure(failure); +// boolean handle = failed(failure); +// consumeInput(); +// return new FailureTask(failure, callback, handle); + } + @Override public boolean needContent() { diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpTransportOverHTTP3.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpTransportOverHTTP3.java index 78f963fb974..4acbe54afc1 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpTransportOverHTTP3.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpTransportOverHTTP3.java @@ -13,15 +13,28 @@ package org.eclipse.jetty.http3.server.internal; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.eclipse.jetty.http.BadMessageException; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.internal.HTTP3ErrorCode; import org.eclipse.jetty.http3.internal.HTTP3Stream; import org.eclipse.jetty.server.HttpTransport; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +42,10 @@ public class HttpTransportOverHTTP3 implements HttpTransport { private static final Logger LOG = LoggerFactory.getLogger(HttpTransportOverHTTP3.class); + private final AtomicBoolean commit = new AtomicBoolean(); + private final TransportCallback transportCallback = new TransportCallback(); private final HTTP3Stream stream; + private MetaData.Response metaData; public HttpTransportOverHTTP3(HTTP3Stream stream) { @@ -39,16 +55,194 @@ public class HttpTransportOverHTTP3 implements HttpTransport @Override public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback) { - CompletableFuture future = stream.respond(new HeadersFrame(response, true)); - future.whenComplete((s, x) -> + if (response != null) + sendHeaders(request, response, content, lastContent, callback); + else + sendContent(request, content, lastContent, callback); + } + + private void sendHeaders(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback) + { + metaData = response; + + HeadersFrame headersFrame; + DataFrame dataFrame = null; + HeadersFrame trailersFrame = null; + + boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod()); + boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; + int status = response.getStatus(); + boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102; + if (interimResponse) { - if (x == null) - callback.succeeded(); + // Must not commit interim responses. + if (hasContent) + { + callback.failed(new IllegalStateException("Interim response cannot have content")); + return; + } + headersFrame = new HeadersFrame(metaData, false); + } + else + { + if (commit.compareAndSet(false, true)) + { + if (lastContent) + { + long realContentLength = BufferUtil.length(content); + long contentLength = response.getContentLength(); + if (contentLength < 0) + { + metaData = new MetaData.Response( + response.getHttpVersion(), + response.getStatus(), + response.getReason(), + response.getFields(), + realContentLength, + response.getTrailerSupplier() + ); + } + else if (hasContent && contentLength != realContentLength) + { + callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength))); + return; + } + } + + if (hasContent) + { + headersFrame = new HeadersFrame(metaData, false); + if (lastContent) + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + dataFrame = new DataFrame(content, true); + } + else + { + dataFrame = new DataFrame(content, false); + trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), true); + } + } + else + { + dataFrame = new DataFrame(content, false); + } + } + else + { + if (lastContent) + { + if (isTunnel(request, metaData)) + { + headersFrame = new HeadersFrame(metaData, false); + } + else + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + headersFrame = new HeadersFrame(metaData, true); + } + else + { + headersFrame = new HeadersFrame(metaData, false); + trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), true); + } + } + } + else + { + headersFrame = new HeadersFrame(metaData, false); + } + } + } else - callback.failed(x); + { + callback.failed(new IllegalStateException("committed")); + return; + } + } + + HeadersFrame hf = headersFrame; + DataFrame df = dataFrame; + HeadersFrame tf = trailersFrame; + + transportCallback.send(callback, true, c -> + { + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP3 Response #{}/{}:{}{} {}{}{}", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + System.lineSeparator(), HttpVersion.HTTP_3, metaData.getStatus(), + System.lineSeparator(), metaData.getFields()); + } + CompletableFuture cf = stream.respond(hf); + if (df != null) + cf = cf.thenCompose(s -> s.data(df)); + if (tf != null) + cf = cf.thenCompose(s -> s.trailer(tf)); + c.completeWith(cf); }); } + private void sendContent(MetaData.Request request, ByteBuffer content, boolean lastContent, Callback callback) + { + boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod()); + boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest; + if (hasContent || (lastContent && !isTunnel(request, metaData))) + { + if (lastContent) + { + HttpFields trailers = retrieveTrailers(); + if (trailers == null) + { + transportCallback.send(callback, false, c -> + sendDataFrame(content, true, true, c)); + } + else + { + SendTrailers sendTrailers = new SendTrailers(callback, trailers); + if (hasContent) + { + transportCallback.send(sendTrailers, false, c -> + sendDataFrame(content, true, false, c)); + } + else + { + sendTrailers.succeeded(); + } + } + } + else + { + transportCallback.send(callback, false, c -> + sendDataFrame(content, false, false, c)); + } + } + else + { + callback.succeeded(); + } + } + + private HttpFields retrieveTrailers() + { + Supplier supplier = metaData.getTrailerSupplier(); + if (supplier == null) + return null; + HttpFields trailers = supplier.get(); + if (trailers == null) + return null; + return trailers.size() == 0 ? null : trailers; + } + + private boolean isTunnel(MetaData.Request request, MetaData.Response response) + { + return HttpMethod.CONNECT.is(request.getMethod()) && response.getStatus() == HttpStatus.OK_200; + } + @Override public boolean isPushSupported() { @@ -58,18 +252,252 @@ public class HttpTransportOverHTTP3 implements HttpTransport @Override public void push(MetaData.Request request) { - + // TODO implement } @Override public void onCompleted() { + Object attachment = stream.getAttachment(); + if (attachment instanceof HttpChannelOverHTTP3) + { + // If the stream is not closed, it is still reading the request content. + // Send a reset to the other end so that it stops sending data. + if (!stream.isClosed()) + { + if (LOG.isDebugEnabled()) + LOG.debug("HTTP3 Response #{}: unconsumed request content, resetting stream", stream.getId()); + stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), new IOException("unconsumed content")); + } + // Consume the existing queued data frames to + // avoid stalling the session flow control. + HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)attachment; + channel.consumeInput(); + } } @Override public void abort(Throwable failure) { + if (LOG.isDebugEnabled()) + LOG.debug("HTTP3 Response #{}/{} aborted", stream.getId(), Integer.toHexString(stream.getSession().hashCode())); + stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), failure); + } + private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP3 Response #{}/{}: {} content bytes{}", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + content.remaining(), lastContent ? " (last chunk)" : ""); + } + DataFrame frame = new DataFrame(content, endStream); + callback.completeWith(stream.data(frame)); + } + + private void sendTrailerFrame(MetaData metaData, Callback callback) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("HTTP3 Response #{}/{}: trailer", + stream.getId(), Integer.toHexString(stream.getSession().hashCode())); + } + + HeadersFrame frame = new HeadersFrame(metaData, true); + callback.completeWith(stream.trailer(frame)); + } + + /** + *

Send states for {@link TransportCallback}.

+ * + * @see TransportCallback + */ + private enum State + { + /** + *

No send initiated or in progress.

+ */ + IDLE, + /** + *

A send is initiated and possibly in progress.

+ */ + SENDING, + /** + *

The terminal state indicating failure of the send.

+ */ + FAILED + } + + /** + *

Callback that controls sends initiated by the transport, by eventually + * notifying a nested callback.

+ *

There are 3 sources of concurrency after a send is initiated:

+ *
    + *
  • the completion of the send operation, either success or failure
  • + *
  • an asynchronous failure coming from the read side such as a stream + * being reset, or the connection being closed
  • + *
  • an asynchronous idle timeout
  • + *
+ * + * @see State + */ + private class TransportCallback implements Callback + { + private final AutoLock _lock = new AutoLock(); + private State _state = State.IDLE; + private Callback _callback; + private boolean _commit; + private Throwable _failure; + + private void reset(Throwable failure) + { + assert _lock.isHeldByCurrentThread(); + _state = failure != null ? State.FAILED : State.IDLE; + _callback = null; + _commit = false; + _failure = failure; + } + + private void send(Callback callback, boolean commit, Consumer sendFrame) + { + Throwable failure = sending(callback, commit); + if (failure == null) + sendFrame.accept(this); + else + callback.failed(failure); + } + + private void abort(Throwable failure) + { + failed(failure); + } + + private Throwable sending(Callback callback, boolean commit) + { + try (AutoLock l = _lock.lock()) + { + switch (_state) + { + case IDLE: + { + _state = State.SENDING; + _callback = callback; + _commit = commit; + return null; + } + case FAILED: + { + return _failure; + } + default: + { + return new IllegalStateException("Invalid transport state: " + _state); + } + } + } + } + + @Override + public void succeeded() + { + Callback callback; + boolean commit; + try (AutoLock l = _lock.lock()) + { + if (_state != State.SENDING) + { + // This thread lost the race to succeed the current + // send, as other threads likely already failed it. + return; + } + callback = _callback; + commit = _commit; + reset(null); + } + if (LOG.isDebugEnabled()) + LOG.debug("HTTP3 Response #{}/{} {} success", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + commit ? "commit" : "flush"); + callback.succeeded(); + } + + @Override + public void failed(Throwable failure) + { + Callback callback; + boolean commit; + try (AutoLock l = _lock.lock()) + { + if (_state != State.SENDING) + { + reset(failure); + return; + } + callback = _callback; + commit = _commit; + reset(failure); + } + if (LOG.isDebugEnabled()) + LOG.debug("HTTP3 Response #{}/{} {} failure", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + commit ? "commit" : "flush", + failure); + callback.failed(failure); + } + + private boolean idleTimeout(Throwable failure) + { + Callback callback = null; + try (AutoLock l = _lock.lock()) + { + // Ignore idle timeouts if not writing, + // as the application may be suspended. + if (_state == State.SENDING) + { + callback = _callback; + reset(failure); + } + } + boolean timeout = callback != null; + if (LOG.isDebugEnabled()) + LOG.debug("HTTP3 Response #{}/{} idle timeout {}", + stream.getId(), Integer.toHexString(stream.getSession().hashCode()), + timeout ? "expired" : "ignored", + failure); + if (timeout) + callback.failed(failure); + return timeout; + } + + @Override + public InvocationType getInvocationType() + { + Callback callback; + try (AutoLock l = _lock.lock()) + { + callback = _callback; + } + return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType(); + } + } + + private class SendTrailers extends Callback.Nested + { + private final HttpFields trailers; + + private SendTrailers(Callback callback, HttpFields trailers) + { + super(callback); + this.trailers = trailers; + } + + @Override + public void succeeded() + { + transportCallback.send(getCallback(), false, c -> + sendTrailerFrame(new MetaData(HttpVersion.HTTP_2, trailers), c)); + } } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java index 7d351d98f16..c6acc349310 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.http3.server.internal; -import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.internal.HTTP3Stream; import org.eclipse.jetty.http3.internal.HTTP3StreamConnection; @@ -47,22 +46,32 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection public Runnable onRequest(HTTP3Stream stream, HeadersFrame frame) { HttpTransport transport = new HttpTransportOverHTTP3(stream); - HttpChannel channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this); + HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this); stream.setAttachment(channel); - channel.onRequest(((MetaData.Request)frame.getMetaData())); - return channel; + return channel.onRequest(frame); } - public void onDataAvailable(HTTP3Stream stream) + public Runnable onDataAvailable(HTTP3Stream stream) { - HttpChannel channel = (HttpChannel)stream.getAttachment(); - if (channel.getRequest().getHttpInput().onContentProducible()) - channel.handle(); + HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment(); + return channel.onDataAvailable(); } - public void onTrailer(HTTP3Stream stream, HeadersFrame frame) + public Runnable onTrailer(HTTP3Stream stream, HeadersFrame frame) { - HttpChannel channel = (HttpChannel)stream.getAttachment(); - channel.onTrailers(frame.getMetaData().getFields()); + HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment(); + return channel.onTrailer(frame); + } + + public boolean onIdleTimeout(HTTP3Stream stream, Throwable failure) + { + HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment(); + return channel.onIdleTimeout(failure, null); // TODO + } + + public void onFailure(HTTP3Stream stream, Throwable failure) + { + HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment(); + channel.onFailure(failure); } } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HandlerClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HandlerClientServerTest.java index 58892521525..2d7f5a931b7 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HandlerClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HandlerClientServerTest.java @@ -14,29 +14,39 @@ package org.eclipse.jetty.http3.tests; import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.IO; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertTrue; public class HandlerClientServerTest extends AbstractClientServerTest { @Test - public void test() throws Exception + public void testGet() throws Exception { CountDownLatch serverLatch = new CountDownLatch(1); start(new AbstractHandler() @@ -68,4 +78,82 @@ public class HandlerClientServerTest extends AbstractClientServerTest assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS)); } + + @Disabled + @Test + public void testPost() throws Exception + { + CountDownLatch serverLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + jettyRequest.setHandled(true); + IO.copy(request.getInputStream(), response.getOutputStream()); + serverLatch.countDown(); + } + }); + + Session.Client session = newSession(new Session.Client.Listener() {}); + + List clientReceivedBuffers = new ArrayList<>(); + + CountDownLatch clientResponseLatch = new CountDownLatch(1); + HeadersFrame frame = new HeadersFrame(newRequest(HttpMethod.POST, "/"), false); + Stream stream = session.newRequest(frame, new Stream.Listener() + { + @Override + public void onResponse(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + assertThat(response.getStatus(), is(HttpStatus.OK_200)); + stream.demand(); + } + + @Override + public void onDataAvailable(Stream stream) + { + Stream.Data data = stream.readData(); + if (data == null) + { + stream.demand(); + return; + } + + ByteBuffer byteBuffer = data.getByteBuffer(); + ByteBuffer copy = ByteBuffer.allocate(byteBuffer.remaining()); + copy.put(byteBuffer); + copy.flip(); + clientReceivedBuffers.add(copy); + data.complete(); + + if (data.isLast()) + { + clientResponseLatch.countDown(); + return; + } + + stream.demand(); + } + }) + .get(5, TimeUnit.SECONDS); + + byte[] bytes = new byte[16 * 1024 * 1024]; + new Random().nextBytes(bytes); + stream.data(new DataFrame(ByteBuffer.wrap(bytes, 0, bytes.length / 2), false)) + .thenCompose(s -> s.data(new DataFrame(ByteBuffer.wrap(bytes, bytes.length / 2, bytes.length), true))) + .get(555, TimeUnit.SECONDS); + + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS)); + + int sum = clientReceivedBuffers.stream().mapToInt(Buffer::remaining).sum(); + assertThat(sum, is(bytes.length)); + + byte[] mirroredBytes = new byte[sum]; + ByteBuffer clientBuffer = ByteBuffer.wrap(mirroredBytes); + clientReceivedBuffers.forEach(clientBuffer::put); + assertArrayEquals(bytes, mirroredBytes); + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 2b47e2d848a..a1f3d1319d4 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -39,6 +39,17 @@ public interface Callback extends Invocable } }; + default void completeWith(CompletableFuture cf) + { + cf.whenComplete((o, x) -> + { + if (x == null) + succeeded(); + else + failed(x); + }); + } + /** *

Completes this callback with the given {@link CompletableFuture}.

*

When the CompletableFuture completes normally, this callback is succeeded;