Issue #6728 - QUIC and HTTP/3

- Implemented DATA frame generation and parsing.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-14 09:08:23 +02:00
parent 59664ecaf6
commit f63a7efc5a
9 changed files with 285 additions and 39 deletions

View File

@ -35,6 +35,11 @@
<artifactId>jetty-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-slf4j-impl</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -15,7 +15,9 @@ package org.eclipse.jetty.http3.api;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
public interface Stream
{
@ -27,6 +29,10 @@ public interface Stream
{
}
public default void onData(Stream stream, DataFrame frame, Callback callback)
{
}
public default void onTrailer(Stream stream, HeadersFrame frame)
{
}

View File

@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
@ -60,6 +61,11 @@ public abstract class HTTP3Session implements Session, ParserListener
return streams.computeIfAbsent(streamId, id -> new HTTP3Stream(this, streamId));
}
protected HTTP3Stream getStream(long streamId)
{
return streams.get(streamId);
}
protected abstract void writeFrame(long streamId, Frame frame, Callback callback);
public Map<Long, Long> onPreface()
@ -163,4 +169,27 @@ public abstract class HTTP3Session implements Session, ParserListener
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override
public void onData(long streamId, DataFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {}#{} on {}", frame, streamId, this);
// The stream must already exist.
HTTP3Stream stream = getStream(streamId);
// TODO: handle null stream.
// TODO: implement demand mechanism like in HTTP2Stream
// demand(n) should be on Stream, or on a LongConsumer parameter?
// TODO: the callback in HTTP2 was only to notify of data consumption for flow control.
// Here, we don't have to do flow control, but what about retain()/release() for the network buffer?
notifyData(stream, frame, Callback.NOOP);
}
private void notifyData(HTTP3Stream stream, DataFrame frame, Callback callback)
{
stream.getListener().onData(stream, frame, callback);
}
}

View File

@ -13,7 +13,12 @@
package org.eclipse.jetty.http3.internal.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.io.ByteBufferPool;
public class DataGenerator extends FrameGenerator
@ -21,6 +26,21 @@ public class DataGenerator extends FrameGenerator
@Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
return 0;
DataFrame dataFrame = (DataFrame)frame;
return generateDataFrame(lease, dataFrame);
}
private int generateDataFrame(ByteBufferPool.Lease lease, DataFrame frame)
{
ByteBuffer data = frame.getData();
int dataLength = data.remaining();
int headerLength = VarLenInt.length(FrameType.DATA.type()) + VarLenInt.length(dataLength);
ByteBuffer header = ByteBuffer.allocate(headerLength);
VarLenInt.generate(header, FrameType.DATA.type());
VarLenInt.generate(header, dataLength);
header.flip();
lease.append(header, false);
lease.append(data, false);
return headerLength + dataLength;
}
}

View File

@ -50,15 +50,15 @@ public class HeadersGenerator extends FrameGenerator
ByteBuffer buffer = lease.acquire(maxLength, useDirectByteBuffers);
encoder.encode(buffer, streamId, frame.getMetaData());
buffer.flip();
int length = buffer.remaining();
int capacity = VarLenInt.length(FrameType.HEADERS.type()) + VarLenInt.length(length);
ByteBuffer header = ByteBuffer.allocate(capacity);
int dataLength = buffer.remaining();
int headerLength = VarLenInt.length(FrameType.HEADERS.type()) + VarLenInt.length(dataLength);
ByteBuffer header = ByteBuffer.allocate(headerLength);
VarLenInt.generate(header, FrameType.HEADERS.type());
VarLenInt.generate(header, length);
VarLenInt.generate(header, dataLength);
header.flip();
lease.append(header, false);
lease.append(buffer, true);
return buffer.remaining();
return headerLength + dataLength;
}
catch (QpackException e)
{

View File

@ -0,0 +1,80 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class DataGenerateParseTest
{
@Test
public void testGenerateParseEmpty()
{
testGenerateParse(BufferUtil.EMPTY_BUFFER);
}
@Test
public void testGenerateParse()
{
byte[] bytes = new byte[1024];
new Random().nextBytes(bytes);
testGenerateParse(ByteBuffer.wrap(bytes));
}
private void testGenerateParse(ByteBuffer byteBuffer)
{
byte[] inputBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(inputBytes);
DataFrame input = new DataFrame(ByteBuffer.wrap(inputBytes));
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(null, 8192, true).generate(lease, 0, input);
List<DataFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, null, new ParserListener()
{
@Override
public void onData(long streamId, DataFrame frame)
{
frames.add(frame);
}
});
for (ByteBuffer buffer : lease.getByteBuffers())
{
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
}
assertEquals(1, frames.size());
DataFrame output = frames.get(0);
byte[] outputBytes = new byte[output.getData().remaining()];
output.getData().get(outputBytes);
assertArrayEquals(inputBytes, outputBytes);
}
}

View File

@ -0,0 +1,78 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class HeadersGenerateParseTest
{
@Test
public void testGenerateParse()
{
HttpURI uri = HttpURI.from("http://host:1234/path?a=b");
HttpFields fields = HttpFields.build()
.put("User-Agent", "Jetty")
.put("Cookie", "c=d");
HeadersFrame input = new HeadersFrame(new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, fields));
QpackEncoder encoder = new QpackEncoder(instructions -> {}, 100);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(encoder, 8192, true).generate(lease, 0, input);
QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192);
List<HeadersFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, decoder, new ParserListener()
{
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
frames.add(frame);
}
});
for (ByteBuffer buffer : lease.getByteBuffers())
{
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
}
assertEquals(1, frames.size());
HeadersFrame output = frames.get(0);
MetaData.Request inputMetaData = (MetaData.Request)input.getMetaData();
MetaData.Request outputMetaData = (MetaData.Request)output.getMetaData();
assertEquals(inputMetaData.getMethod(), outputMetaData.getMethod());
assertEquals(inputMetaData.getURIString(), outputMetaData.getURIString());
assertEquals(inputMetaData.getFields(), outputMetaData.getFields());
}
}

View File

@ -19,8 +19,8 @@ import java.util.List;
import java.util.Map;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.generator.SettingsGenerator;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.generator.ControlGenerator;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
@ -48,10 +48,10 @@ public class SettingsGenerateParseTest
SettingsFrame input = new SettingsFrame(settings);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new SettingsGenerator().generate(lease, 0, input);
new ControlGenerator().generate(lease, 0, input);
List<SettingsFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, null, new ParserListener()
ControlParser parser = new ControlParser(new ParserListener()
{
@Override
public void onSettings(SettingsFrame frame)

View File

@ -32,8 +32,10 @@ import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
import org.eclipse.jetty.quic.server.ServerQuicConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -41,21 +43,42 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class HTTP3ClientServerTest
{
@Test
public void testGETThenResponseWithoutContent() throws Exception
private Server server;
private ServerQuicConnector connector;
private HTTP3Client client;
private void startServer(Session.Server.Listener listener) throws Exception
{
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.p12");
sslContextFactory.setKeyStorePassword("storepwd");
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
Server server = new Server(serverThreads);
server = new Server(serverThreads);
connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(listener));
server.addConnector(connector);
server.start();
}
private void startClient() throws Exception
{
client = new HTTP3Client();
client.start();
}
@AfterEach
public void dispose()
{
LifeCycle.stop(client);
LifeCycle.stop(server);
}
@Test
public void testConnectTriggersSettingsFrame() throws Exception
{
CountDownLatch serverPrefaceLatch = new CountDownLatch(1);
CountDownLatch serverSettingsLatch = new CountDownLatch(1);
CountDownLatch serverRequestLatch = new CountDownLatch(1);
ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(new Session.Server.Listener()
startServer(new Session.Server.Listener()
{
@Override
public Map<Long, Long> onPreface(Session session)
@ -69,22 +92,8 @@ public class HTTP3ClientServerTest
{
serverSettingsLatch.countDown();
}
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverRequestLatch.countDown();
// Send the response.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY)));
// Not interested in request data.
return null;
}
}));
server.addConnector(connector);
server.start();
HTTP3Client client = new HTTP3Client();
client.start();
});
startClient();
CountDownLatch clientPrefaceLatch = new CountDownLatch(1);
CountDownLatch clientSettingsLatch = new CountDownLatch(1);
@ -103,7 +112,30 @@ public class HTTP3ClientServerTest
clientSettingsLatch.countDown();
}
})
.get(555, TimeUnit.SECONDS);
.get(5, TimeUnit.SECONDS);
assertNotNull(session);
}
@Test
public void testGETThenResponseWithoutContent() throws Exception
{
CountDownLatch serverRequestLatch = new CountDownLatch(1);
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverRequestLatch.countDown();
// Send the response.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY)));
// Not interested in request data.
return null;
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
CountDownLatch clientResponseLatch = new CountDownLatch(1);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
@ -120,11 +152,7 @@ public class HTTP3ClientServerTest
.get(555, TimeUnit.SECONDS);
assertNotNull(stream);
assertTrue(clientPrefaceLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverPrefaceLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverSettingsLatch.await(555, TimeUnit.SECONDS));
assertTrue(clientSettingsLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverRequestLatch.await(555, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
}
}