WIP: implementation of server's upper layers input and output

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-10-14 12:58:38 +02:00 committed by Simone Bordet
parent a0399a2e30
commit 4c66c02176
7 changed files with 824 additions and 34 deletions

View File

@ -77,7 +77,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
}
@Override
public Session getSession()
public HTTP3Session getSession()
{
return session;
}
@ -356,6 +356,11 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
return completable;
}
public boolean isClosed()
{
return closeState == CloseState.CLOSED;
}
void updateClose(boolean update, boolean local)
{
if (update)

View File

@ -42,12 +42,7 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
HTTP3StreamListener listener = new HTTP3StreamListener(http3Stream.getEndPoint());
Runnable runnable = listener.onRequest(stream, frame);
if (runnable != null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)((HTTP3Session)http3Stream.getSession()).getProtocolSession();
protocolSession.offer(runnable);
}
listener.onRequest(stream, frame);
return listener;
}
}
@ -66,21 +61,51 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
return (ServerHTTP3StreamConnection)endPoint.getConnection();
}
public Runnable onRequest(Stream stream, HeadersFrame frame)
public void onRequest(Stream stream, HeadersFrame frame)
{
return getConnection().onRequest((HTTP3Stream)stream, frame);
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable runnable = getConnection().onRequest(http3Stream, frame);
if (runnable != null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(runnable);
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
{
getConnection().onTrailer((HTTP3Stream)stream, frame);
}
@Override
public void onDataAvailable(Stream stream)
{
getConnection().onDataAvailable((HTTP3Stream)stream);
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable runnable = getConnection().onDataAvailable(http3Stream);
if (runnable != null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(runnable);
}
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable runnable = getConnection().onTrailer(http3Stream, frame);
if (runnable != null)
{
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(runnable);
}
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable failure)
{
return getConnection().onIdleTimeout((HTTP3Stream)stream, failure);
}
@Override
public void onFailure(Stream stream, Throwable failure)
{
getConnection().onFailure((HTTP3Stream)stream, failure);
}
}
}

View File

@ -13,7 +13,20 @@
package org.eclipse.jetty.http3.server.internal;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.io.EndPoint;
@ -22,12 +35,22 @@ import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpChannelOverHTTP3 extends HttpChannel
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP3.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION);
private final HTTP3Stream stream;
private final ServerHTTP3StreamConnection connection;
private HttpInput.Content content;
private boolean expect100Continue;
private boolean delayedUntilContent;
public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HTTP3Stream stream, ServerHTTP3StreamConnection connection)
{
@ -36,6 +59,207 @@ public class HttpChannelOverHTTP3 extends HttpChannel
this.connection = connection;
}
void consumeInput()
{
getRequest().getHttpInput().consumeAll();
}
public Runnable onRequest(HeadersFrame frame)
{
try
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
HttpFields fields = request.getFields();
expect100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
HttpFields.Mutable response = getResponse().getHttpFields();
if (getHttpConfiguration().getSendServerVersion())
response.add(SERVER_VERSION);
if (getHttpConfiguration().getSendXPoweredBy())
response.add(POWERED_BY);
onRequest(request);
boolean endStream = frame.isLast();
if (endStream)
{
onContentComplete();
onRequestComplete();
}
boolean connect = request instanceof MetaData.ConnectRequest;
delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
!endStream && !expect100Continue && !connect;
// Delay the demand of DATA frames for CONNECT with :protocol
// or for normal requests expecting 100 continue.
if (connect)
{
if (request.getProtocol() == null)
stream.demand();
}
else
{
if (delayedUntilContent)
stream.demand();
}
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 Request #{}/{}, delayed={}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
delayedUntilContent, System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), fields);
}
return delayedUntilContent ? null : this;
}
catch (BadMessageException x)
{
if (LOG.isDebugEnabled())
LOG.debug("onRequest", x);
onBadMessage(x);
return null;
}
catch (Throwable x)
{
onBadMessage(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, null, x));
return null;
}
}
public Runnable onDataAvailable()
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return null;
}
ByteBuffer buffer = data.getByteBuffer();
int length = buffer.remaining();
HttpInput.Content content = new HttpInput.Content(buffer)
{
@Override
public boolean isEof()
{
return data.isLast();
}
@Override
public void succeeded()
{
data.complete();
}
@Override
public void failed(Throwable x)
{
data.complete();
}
};
this.content = content;
boolean handle = onContent(content);
boolean isLast = data.isLast();
if (isLast)
{
boolean handleContent = onContentComplete();
// This will generate EOF -> must happen before onContentProducible.
boolean handleRequest = onRequestComplete();
handle |= handleContent | handleRequest;
}
boolean woken = getRequest().getHttpInput().onContentProducible();
handle |= woken;
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 Request #{}/{}: {} bytes of {} content, woken: {}, handle: {}",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length,
isLast ? "last" : "some",
woken,
handle);
}
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
return handle || wasDelayed ? this : null;
}
public Runnable onTrailer(HeadersFrame frame)
{
HttpFields trailers = frame.getMetaData().getFields();
if (trailers.size() > 0)
onTrailers(trailers);
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 Request #{}/{}, trailers:{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), trailers);
}
// This will generate EOF -> need to call onContentProducible.
boolean handle = onRequestComplete();
boolean woken = getRequest().getHttpInput().onContentProducible();
handle |= woken;
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
return handle || wasDelayed ? this : null;
}
public boolean onIdleTimeout(Throwable failure, Consumer<Runnable> consumer)
{
boolean delayed = delayedUntilContent;
delayedUntilContent = false;
boolean reset = getState().isIdle();
if (reset)
consumeInput();
//TODO
// getHttpTransport().onStreamTimeout(failure);
failure.addSuppressed(new Throwable("HttpInput idle timeout"));
// TODO: writing to the content field here is at race with demand?
if (content == null)
content = new HttpInput.ErrorContent(failure);
boolean needed = getRequest().getHttpInput().onContentProducible();
if (needed || delayed)
{
consumer.accept(this::handleWithContext);
reset = false;
}
return reset;
}
private void handleWithContext()
{
ContextHandler context = getState().getContextHandler();
if (context != null)
context.handle(getRequest(), this);
else
handle();
}
public void onFailure(Throwable failure)
{
//TODO
// getHttpTransport().onStreamFailure(failure);
// boolean handle = failed(failure);
// consumeInput();
// return new FailureTask(failure, callback, handle);
}
@Override
public boolean needContent()
{

View File

@ -13,15 +13,28 @@
package org.eclipse.jetty.http3.server.internal;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,7 +42,10 @@ public class HttpTransportOverHTTP3 implements HttpTransport
{
private static final Logger LOG = LoggerFactory.getLogger(HttpTransportOverHTTP3.class);
private final AtomicBoolean commit = new AtomicBoolean();
private final TransportCallback transportCallback = new TransportCallback();
private final HTTP3Stream stream;
private MetaData.Response metaData;
public HttpTransportOverHTTP3(HTTP3Stream stream)
{
@ -39,16 +55,194 @@ public class HttpTransportOverHTTP3 implements HttpTransport
@Override
public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
{
CompletableFuture<Stream> future = stream.respond(new HeadersFrame(response, true));
future.whenComplete((s, x) ->
{
if (x == null)
callback.succeeded();
if (response != null)
sendHeaders(request, response, content, lastContent, callback);
else
callback.failed(x);
sendContent(request, content, lastContent, callback);
}
private void sendHeaders(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
{
metaData = response;
HeadersFrame headersFrame;
DataFrame dataFrame = null;
HeadersFrame trailersFrame = null;
boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
int status = response.getStatus();
boolean interimResponse = status == HttpStatus.CONTINUE_100 || status == HttpStatus.PROCESSING_102;
if (interimResponse)
{
// Must not commit interim responses.
if (hasContent)
{
callback.failed(new IllegalStateException("Interim response cannot have content"));
return;
}
headersFrame = new HeadersFrame(metaData, false);
}
else
{
if (commit.compareAndSet(false, true))
{
if (lastContent)
{
long realContentLength = BufferUtil.length(content);
long contentLength = response.getContentLength();
if (contentLength < 0)
{
metaData = new MetaData.Response(
response.getHttpVersion(),
response.getStatus(),
response.getReason(),
response.getFields(),
realContentLength,
response.getTrailerSupplier()
);
}
else if (hasContent && contentLength != realContentLength)
{
callback.failed(new BadMessageException(HttpStatus.INTERNAL_SERVER_ERROR_500, String.format("Incorrect Content-Length %d!=%d", contentLength, realContentLength)));
return;
}
}
if (hasContent)
{
headersFrame = new HeadersFrame(metaData, false);
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers == null)
{
dataFrame = new DataFrame(content, true);
}
else
{
dataFrame = new DataFrame(content, false);
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), true);
}
}
else
{
dataFrame = new DataFrame(content, false);
}
}
else
{
if (lastContent)
{
if (isTunnel(request, metaData))
{
headersFrame = new HeadersFrame(metaData, false);
}
else
{
HttpFields trailers = retrieveTrailers();
if (trailers == null)
{
headersFrame = new HeadersFrame(metaData, true);
}
else
{
headersFrame = new HeadersFrame(metaData, false);
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), true);
}
}
}
else
{
headersFrame = new HeadersFrame(metaData, false);
}
}
}
else
{
callback.failed(new IllegalStateException("committed"));
return;
}
}
HeadersFrame hf = headersFrame;
DataFrame df = dataFrame;
HeadersFrame tf = trailersFrame;
transportCallback.send(callback, true, c ->
{
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 Response #{}/{}:{}{} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), HttpVersion.HTTP_3, metaData.getStatus(),
System.lineSeparator(), metaData.getFields());
}
CompletableFuture<Stream> cf = stream.respond(hf);
if (df != null)
cf = cf.thenCompose(s -> s.data(df));
if (tf != null)
cf = cf.thenCompose(s -> s.trailer(tf));
c.completeWith(cf);
});
}
private void sendContent(MetaData.Request request, ByteBuffer content, boolean lastContent, Callback callback)
{
boolean isHeadRequest = HttpMethod.HEAD.is(request.getMethod());
boolean hasContent = BufferUtil.hasContent(content) && !isHeadRequest;
if (hasContent || (lastContent && !isTunnel(request, metaData)))
{
if (lastContent)
{
HttpFields trailers = retrieveTrailers();
if (trailers == null)
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, true, true, c));
}
else
{
SendTrailers sendTrailers = new SendTrailers(callback, trailers);
if (hasContent)
{
transportCallback.send(sendTrailers, false, c ->
sendDataFrame(content, true, false, c));
}
else
{
sendTrailers.succeeded();
}
}
}
else
{
transportCallback.send(callback, false, c ->
sendDataFrame(content, false, false, c));
}
}
else
{
callback.succeeded();
}
}
private HttpFields retrieveTrailers()
{
Supplier<HttpFields> supplier = metaData.getTrailerSupplier();
if (supplier == null)
return null;
HttpFields trailers = supplier.get();
if (trailers == null)
return null;
return trailers.size() == 0 ? null : trailers;
}
private boolean isTunnel(MetaData.Request request, MetaData.Response response)
{
return HttpMethod.CONNECT.is(request.getMethod()) && response.getStatus() == HttpStatus.OK_200;
}
@Override
public boolean isPushSupported()
{
@ -58,18 +252,252 @@ public class HttpTransportOverHTTP3 implements HttpTransport
@Override
public void push(MetaData.Request request)
{
// TODO implement
}
@Override
public void onCompleted()
{
Object attachment = stream.getAttachment();
if (attachment instanceof HttpChannelOverHTTP3)
{
// If the stream is not closed, it is still reading the request content.
// Send a reset to the other end so that it stops sending data.
if (!stream.isClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}: unconsumed request content, resetting stream", stream.getId());
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), new IOException("unconsumed content"));
}
// Consume the existing queued data frames to
// avoid stalling the session flow control.
HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)attachment;
channel.consumeInput();
}
}
@Override
public void abort(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}/{} aborted", stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), failure);
}
private void sendDataFrame(ByteBuffer content, boolean lastContent, boolean endStream, Callback callback)
{
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 Response #{}/{}: {} content bytes{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
content.remaining(), lastContent ? " (last chunk)" : "");
}
DataFrame frame = new DataFrame(content, endStream);
callback.completeWith(stream.data(frame));
}
private void sendTrailerFrame(MetaData metaData, Callback callback)
{
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 Response #{}/{}: trailer",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
}
HeadersFrame frame = new HeadersFrame(metaData, true);
callback.completeWith(stream.trailer(frame));
}
/**
* <p>Send states for {@link TransportCallback}.</p>
*
* @see TransportCallback
*/
private enum State
{
/**
* <p>No send initiated or in progress.</p>
*/
IDLE,
/**
* <p>A send is initiated and possibly in progress.</p>
*/
SENDING,
/**
* <p>The terminal state indicating failure of the send.</p>
*/
FAILED
}
/**
* <p>Callback that controls sends initiated by the transport, by eventually
* notifying a nested callback.</p>
* <p>There are 3 sources of concurrency after a send is initiated:</p>
* <ul>
* <li>the completion of the send operation, either success or failure</li>
* <li>an asynchronous failure coming from the read side such as a stream
* being reset, or the connection being closed</li>
* <li>an asynchronous idle timeout</li>
* </ul>
*
* @see State
*/
private class TransportCallback implements Callback
{
private final AutoLock _lock = new AutoLock();
private State _state = State.IDLE;
private Callback _callback;
private boolean _commit;
private Throwable _failure;
private void reset(Throwable failure)
{
assert _lock.isHeldByCurrentThread();
_state = failure != null ? State.FAILED : State.IDLE;
_callback = null;
_commit = false;
_failure = failure;
}
private void send(Callback callback, boolean commit, Consumer<Callback> sendFrame)
{
Throwable failure = sending(callback, commit);
if (failure == null)
sendFrame.accept(this);
else
callback.failed(failure);
}
private void abort(Throwable failure)
{
failed(failure);
}
private Throwable sending(Callback callback, boolean commit)
{
try (AutoLock l = _lock.lock())
{
switch (_state)
{
case IDLE:
{
_state = State.SENDING;
_callback = callback;
_commit = commit;
return null;
}
case FAILED:
{
return _failure;
}
default:
{
return new IllegalStateException("Invalid transport state: " + _state);
}
}
}
}
@Override
public void succeeded()
{
Callback callback;
boolean commit;
try (AutoLock l = _lock.lock())
{
if (_state != State.SENDING)
{
// This thread lost the race to succeed the current
// send, as other threads likely already failed it.
return;
}
callback = _callback;
commit = _commit;
reset(null);
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}/{} {} success",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush");
callback.succeeded();
}
@Override
public void failed(Throwable failure)
{
Callback callback;
boolean commit;
try (AutoLock l = _lock.lock())
{
if (_state != State.SENDING)
{
reset(failure);
return;
}
callback = _callback;
commit = _commit;
reset(failure);
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}/{} {} failure",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
commit ? "commit" : "flush",
failure);
callback.failed(failure);
}
private boolean idleTimeout(Throwable failure)
{
Callback callback = null;
try (AutoLock l = _lock.lock())
{
// Ignore idle timeouts if not writing,
// as the application may be suspended.
if (_state == State.SENDING)
{
callback = _callback;
reset(failure);
}
}
boolean timeout = callback != null;
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}/{} idle timeout {}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
timeout ? "expired" : "ignored",
failure);
if (timeout)
callback.failed(failure);
return timeout;
}
@Override
public InvocationType getInvocationType()
{
Callback callback;
try (AutoLock l = _lock.lock())
{
callback = _callback;
}
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
}
}
private class SendTrailers extends Callback.Nested
{
private final HttpFields trailers;
private SendTrailers(Callback callback, HttpFields trailers)
{
super(callback);
this.trailers = trailers;
}
@Override
public void succeeded()
{
transportCallback.send(getCallback(), false, c ->
sendTrailerFrame(new MetaData(HttpVersion.HTTP_2, trailers), c));
}
}
}

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.http3.server.internal;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.internal.HTTP3StreamConnection;
@ -47,22 +46,32 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
public Runnable onRequest(HTTP3Stream stream, HeadersFrame frame)
{
HttpTransport transport = new HttpTransportOverHTTP3(stream);
HttpChannel channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this);
HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this);
stream.setAttachment(channel);
channel.onRequest(((MetaData.Request)frame.getMetaData()));
return channel;
return channel.onRequest(frame);
}
public void onDataAvailable(HTTP3Stream stream)
public Runnable onDataAvailable(HTTP3Stream stream)
{
HttpChannel channel = (HttpChannel)stream.getAttachment();
if (channel.getRequest().getHttpInput().onContentProducible())
channel.handle();
HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment();
return channel.onDataAvailable();
}
public void onTrailer(HTTP3Stream stream, HeadersFrame frame)
public Runnable onTrailer(HTTP3Stream stream, HeadersFrame frame)
{
HttpChannel channel = (HttpChannel)stream.getAttachment();
channel.onTrailers(frame.getMetaData().getFields());
HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment();
return channel.onTrailer(frame);
}
public boolean onIdleTimeout(HTTP3Stream stream, Throwable failure)
{
HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment();
return channel.onIdleTimeout(failure, null); // TODO
}
public void onFailure(HTTP3Stream stream, Throwable failure)
{
HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment();
channel.onFailure(failure);
}
}

View File

@ -14,29 +14,39 @@
package org.eclipse.jetty.http3.tests;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HandlerClientServerTest extends AbstractClientServerTest
{
@Test
public void test() throws Exception
public void testGet() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new AbstractHandler()
@ -68,4 +78,82 @@ public class HandlerClientServerTest extends AbstractClientServerTest
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
}
@Disabled
@Test
public void testPost() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
serverLatch.countDown();
}
});
Session.Client session = newSession(new Session.Client.Listener() {});
List<ByteBuffer> clientReceivedBuffers = new ArrayList<>();
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HeadersFrame frame = new HeadersFrame(newRequest(HttpMethod.POST, "/"), false);
Stream stream = session.newRequest(frame, new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertThat(response.getStatus(), is(HttpStatus.OK_200));
stream.demand();
}
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
}
ByteBuffer byteBuffer = data.getByteBuffer();
ByteBuffer copy = ByteBuffer.allocate(byteBuffer.remaining());
copy.put(byteBuffer);
copy.flip();
clientReceivedBuffers.add(copy);
data.complete();
if (data.isLast())
{
clientResponseLatch.countDown();
return;
}
stream.demand();
}
})
.get(5, TimeUnit.SECONDS);
byte[] bytes = new byte[16 * 1024 * 1024];
new Random().nextBytes(bytes);
stream.data(new DataFrame(ByteBuffer.wrap(bytes, 0, bytes.length / 2), false))
.thenCompose(s -> s.data(new DataFrame(ByteBuffer.wrap(bytes, bytes.length / 2, bytes.length), true)))
.get(555, TimeUnit.SECONDS);
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
int sum = clientReceivedBuffers.stream().mapToInt(Buffer::remaining).sum();
assertThat(sum, is(bytes.length));
byte[] mirroredBytes = new byte[sum];
ByteBuffer clientBuffer = ByteBuffer.wrap(mirroredBytes);
clientReceivedBuffers.forEach(clientBuffer::put);
assertArrayEquals(bytes, mirroredBytes);
}
}

View File

@ -39,6 +39,17 @@ public interface Callback extends Invocable
}
};
default void completeWith(CompletableFuture<?> cf)
{
cf.whenComplete((o, x) ->
{
if (x == null)
succeeded();
else
failed(x);
});
}
/**
* <p>Completes this callback with the given {@link CompletableFuture}.</p>
* <p>When the CompletableFuture completes normally, this callback is succeeded;