prototype HTTP3 upper layers
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
16a91b04a8
commit
c430226bfa
|
@ -25,13 +25,14 @@ import org.eclipse.jetty.http3.frames.Frame;
|
|||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.io.CyclicTimeouts;
|
||||
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Stream.class);
|
||||
|
||||
|
@ -43,6 +44,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
|||
private FrameState frameState = FrameState.INITIAL;
|
||||
private long idleTimeout;
|
||||
private long expireNanoTime;
|
||||
private Object attachment;
|
||||
|
||||
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint, boolean local)
|
||||
{
|
||||
|
@ -51,6 +53,23 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
|||
this.local = local;
|
||||
}
|
||||
|
||||
public QuicStreamEndPoint getEndPoint()
|
||||
{
|
||||
return endPoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAttachment()
|
||||
{
|
||||
return attachment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttachment(Object attachment)
|
||||
{
|
||||
this.attachment = attachment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getId()
|
||||
{
|
||||
|
|
|
@ -275,7 +275,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
}
|
||||
}
|
||||
|
||||
private MessageParser.Result parseAndFill()
|
||||
public MessageParser.Result parseAndFill()
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -355,6 +355,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
}
|
||||
}
|
||||
|
||||
public DataFrame pollContent()
|
||||
{
|
||||
return dataFrames.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toConnectionString()
|
||||
{
|
||||
|
|
|
@ -48,6 +48,11 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
|
|||
this.listener = listener;
|
||||
}
|
||||
|
||||
protected Session.Server.Listener getListener()
|
||||
{
|
||||
return listener;
|
||||
}
|
||||
|
||||
@ManagedAttribute("Whether to use direct ByteBuffers for reading")
|
||||
public boolean isUseInputDirectByteBuffers()
|
||||
{
|
||||
|
@ -113,6 +118,6 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
|
|||
long streamId = streamEndPoint.getStreamId();
|
||||
ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
|
||||
MessageParser parser = new MessageParser(http3Session.getSessionServer(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished);
|
||||
return new ServerHTTP3StreamConnection(streamEndPoint, http3Session, parser);
|
||||
return new ServerHTTP3StreamConnection(connector, getHttpConfiguration(), streamEndPoint, http3Session, parser);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,12 @@
|
|||
|
||||
package org.eclipse.jetty.http3.server;
|
||||
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Stream;
|
||||
import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
|
||||
public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionFactory
|
||||
|
@ -24,6 +30,50 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
|
|||
|
||||
public HTTP3ServerConnectionFactory(HttpConfiguration configuration)
|
||||
{
|
||||
super(configuration, null);
|
||||
super(configuration, new HTTP3SessionListener());
|
||||
}
|
||||
|
||||
private static class HTTP3SessionListener implements Session.Server.Listener
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
HTTP3StreamListener listener = new HTTP3StreamListener(((HTTP3Stream)stream).getEndPoint());
|
||||
listener.onRequest(stream, frame);
|
||||
// TODO get a runnable to feed EWYK? See ProtocolSession.processReadableStreams()
|
||||
return listener;
|
||||
}
|
||||
}
|
||||
|
||||
private static class HTTP3StreamListener implements Stream.Listener
|
||||
{
|
||||
private final EndPoint endPoint;
|
||||
|
||||
public HTTP3StreamListener(EndPoint endPoint)
|
||||
{
|
||||
this.endPoint = endPoint;
|
||||
}
|
||||
|
||||
private ServerHTTP3StreamConnection getConnection()
|
||||
{
|
||||
return (ServerHTTP3StreamConnection)endPoint.getConnection();
|
||||
}
|
||||
|
||||
public void onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
getConnection().onRequest((HTTP3Stream)stream, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrailer(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
getConnection().onTrailer((HTTP3Stream)stream, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDataAvailable(Stream stream)
|
||||
{
|
||||
getConnection().onDataAvailable((HTTP3Stream)stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http3.server.internal;
|
||||
|
||||
import org.eclipse.jetty.http3.frames.DataFrame;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Stream;
|
||||
import org.eclipse.jetty.http3.internal.parser.MessageParser;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.HttpChannel;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.HttpInput;
|
||||
import org.eclipse.jetty.server.HttpTransport;
|
||||
|
||||
public class HttpChannelOverHTTP3 extends HttpChannel
|
||||
{
|
||||
private final HTTP3Stream stream;
|
||||
private final ServerHTTP3StreamConnection connection;
|
||||
private HttpInput.Content content;
|
||||
|
||||
public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HTTP3Stream stream, ServerHTTP3StreamConnection connection)
|
||||
{
|
||||
super(connector, configuration, endPoint, transport);
|
||||
this.stream = stream;
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needContent()
|
||||
{
|
||||
if (content != null)
|
||||
return true;
|
||||
|
||||
MessageParser.Result result = connection.parseAndFill();
|
||||
if (result == MessageParser.Result.FRAME)
|
||||
{
|
||||
DataFrame dataFrame = connection.pollContent();
|
||||
content = new HttpInput.Content(dataFrame.getByteBuffer())
|
||||
{
|
||||
@Override
|
||||
public boolean isEof()
|
||||
{
|
||||
return dataFrame.isLast();
|
||||
}
|
||||
};
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
stream.demand();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpInput.Content produceContent()
|
||||
{
|
||||
HttpInput.Content result = content;
|
||||
if (result != null && !result.isSpecial())
|
||||
content = result.isEof() ? new HttpInput.EofContent() : null;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean failAllContent(Throwable failure)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean failed(Throwable failure)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean eof()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http3.server.internal;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Stream;
|
||||
import org.eclipse.jetty.server.HttpTransport;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class HttpTransportOverHTTP3 implements HttpTransport
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HttpTransportOverHTTP3.class);
|
||||
|
||||
private final HTTP3Stream stream;
|
||||
|
||||
public HttpTransportOverHTTP3(HTTP3Stream stream)
|
||||
{
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
|
||||
{
|
||||
CompletableFuture<Stream> future = stream.respond(new HeadersFrame(response, true));
|
||||
future.whenComplete((s, x) ->
|
||||
{
|
||||
if (x == null)
|
||||
callback.succeeded();
|
||||
else
|
||||
callback.failed(x);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPushSupported()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void push(MetaData.Request request)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(Throwable failure)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -13,17 +13,28 @@
|
|||
|
||||
package org.eclipse.jetty.http3.server.internal;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Stream;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3StreamConnection;
|
||||
import org.eclipse.jetty.http3.internal.parser.MessageParser;
|
||||
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.HttpChannel;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.HttpTransport;
|
||||
|
||||
public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
|
||||
{
|
||||
private final Connector connector;
|
||||
private final HttpConfiguration httpConfiguration;
|
||||
private final ServerHTTP3Session http3Session;
|
||||
|
||||
public ServerHTTP3StreamConnection(QuicStreamEndPoint endPoint, ServerHTTP3Session http3Session, MessageParser parser)
|
||||
public ServerHTTP3StreamConnection(Connector connector, HttpConfiguration httpConfiguration, QuicStreamEndPoint endPoint, ServerHTTP3Session http3Session, MessageParser parser)
|
||||
{
|
||||
super(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
|
||||
super(endPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
|
||||
this.connector = connector;
|
||||
this.httpConfiguration = httpConfiguration;
|
||||
this.http3Session = http3Session;
|
||||
}
|
||||
|
||||
|
@ -32,4 +43,27 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
|
|||
{
|
||||
http3Session.onDataAvailable(streamId);
|
||||
}
|
||||
|
||||
public void onRequest(HTTP3Stream stream, HeadersFrame frame)
|
||||
{
|
||||
HttpTransport transport = new HttpTransportOverHTTP3(stream);
|
||||
HttpChannel channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this);
|
||||
stream.setAttachment(channel);
|
||||
// TODO create a Runnable and feed EWYK
|
||||
channel.onRequest(((MetaData.Request)frame.getMetaData()));
|
||||
channel.handle();
|
||||
}
|
||||
|
||||
public void onDataAvailable(HTTP3Stream stream)
|
||||
{
|
||||
HttpChannel channel = (HttpChannel)stream.getAttachment();
|
||||
if (channel.getRequest().getHttpInput().onContentProducible())
|
||||
channel.handle();
|
||||
}
|
||||
|
||||
public void onTrailer(HTTP3Stream stream, HeadersFrame frame)
|
||||
{
|
||||
HttpChannel channel = (HttpChannel)stream.getAttachment();
|
||||
channel.onTrailers(frame.getMetaData().getFields());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,8 +23,11 @@ import org.eclipse.jetty.http.HttpVersion;
|
|||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.client.HTTP3Client;
|
||||
import org.eclipse.jetty.http3.server.HTTP3ServerConnectionFactory;
|
||||
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
|
||||
import org.eclipse.jetty.quic.server.QuicServerConnector;
|
||||
import org.eclipse.jetty.server.ConnectionFactory;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
|
@ -42,6 +45,14 @@ public class AbstractClientServerTest
|
|||
protected HTTP3Client client;
|
||||
protected Server server;
|
||||
|
||||
protected void start(Handler handler) throws Exception
|
||||
{
|
||||
prepareServer(new HTTP3ServerConnectionFactory());
|
||||
server.setHandler(handler);
|
||||
server.start();
|
||||
startClient();
|
||||
}
|
||||
|
||||
protected void start(Session.Server.Listener listener) throws Exception
|
||||
{
|
||||
startServer(listener);
|
||||
|
@ -49,6 +60,12 @@ public class AbstractClientServerTest
|
|||
}
|
||||
|
||||
protected void startServer(Session.Server.Listener listener) throws Exception
|
||||
{
|
||||
prepareServer(new RawHTTP3ServerConnectionFactory(listener));
|
||||
server.start();
|
||||
}
|
||||
|
||||
private void prepareServer(ConnectionFactory serverConnectionFactory)
|
||||
{
|
||||
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
|
||||
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
|
||||
|
@ -56,9 +73,8 @@ public class AbstractClientServerTest
|
|||
QueuedThreadPool serverThreads = new QueuedThreadPool();
|
||||
serverThreads.setName("server");
|
||||
server = new Server(serverThreads);
|
||||
connector = new QuicServerConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(listener));
|
||||
connector = new QuicServerConnector(server, sslContextFactory, serverConnectionFactory);
|
||||
server.addConnector(connector);
|
||||
server.start();
|
||||
}
|
||||
|
||||
protected void startClient() throws Exception
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http3.tests;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class HandlerClientServerTest extends AbstractClientServerTest
|
||||
{
|
||||
@Test
|
||||
public void test() throws Exception
|
||||
{
|
||||
CountDownLatch serverLatch = new CountDownLatch(1);
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
jettyRequest.setHandled(true);
|
||||
serverLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
CountDownLatch clientResponseLatch = new CountDownLatch(1);
|
||||
HeadersFrame frame = new HeadersFrame(newRequest("/"), true);
|
||||
session.newRequest(frame, new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
assertThat(response.getStatus(), is(HttpStatus.OK_200));
|
||||
clientResponseLatch.countDown();
|
||||
}
|
||||
})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
|
||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue