Issue #6728 - QUIC and HTTP/3
- More fixes and improvement to HTTP client transport tests. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
00a4001b5c
commit
aac4232e20
|
@ -55,7 +55,7 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
|
|||
return httpConfiguration;
|
||||
}
|
||||
|
||||
public HTTP3Configuration getConfiguration()
|
||||
public HTTP3Configuration getHTTP3Configuration()
|
||||
{
|
||||
return configuration;
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
|
|||
@Override
|
||||
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
|
||||
{
|
||||
return new ServerHTTP3Session(getConfiguration(), (ServerQuicSession)quicSession, listener);
|
||||
return new ServerHTTP3Session(getHTTP3Configuration(), (ServerQuicSession)quicSession, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -91,11 +91,11 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
|
|||
public void onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
HTTP3Stream http3Stream = (HTTP3Stream)stream;
|
||||
Runnable runnable = getConnection().onRequest(http3Stream, frame);
|
||||
if (runnable != null)
|
||||
Runnable task = getConnection().onRequest(http3Stream, frame);
|
||||
if (task != null)
|
||||
{
|
||||
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
|
||||
protocolSession.offer(runnable);
|
||||
protocolSession.offer(task, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,11 +103,11 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
|
|||
public void onDataAvailable(Stream stream)
|
||||
{
|
||||
HTTP3Stream http3Stream = (HTTP3Stream)stream;
|
||||
Runnable runnable = getConnection().onDataAvailable(http3Stream);
|
||||
if (runnable != null)
|
||||
Runnable task = getConnection().onDataAvailable(http3Stream);
|
||||
if (task != null)
|
||||
{
|
||||
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
|
||||
protocolSession.offer(runnable);
|
||||
protocolSession.offer(task, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,18 +115,26 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
|
|||
public void onTrailer(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
HTTP3Stream http3Stream = (HTTP3Stream)stream;
|
||||
Runnable runnable = getConnection().onTrailer(http3Stream, frame);
|
||||
if (runnable != null)
|
||||
Runnable task = getConnection().onTrailer(http3Stream, frame);
|
||||
if (task != null)
|
||||
{
|
||||
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
|
||||
protocolSession.offer(runnable);
|
||||
protocolSession.offer(task, false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleTimeout(Stream stream, Throwable failure)
|
||||
{
|
||||
return getConnection().onIdleTimeout((HTTP3Stream)stream, failure);
|
||||
HTTP3Stream http3Stream = (HTTP3Stream)stream;
|
||||
return getConnection().onIdleTimeout((HTTP3Stream)stream, failure, task ->
|
||||
{
|
||||
if (task != null)
|
||||
{
|
||||
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
|
||||
protocolSession.offer(task, true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,31 +31,37 @@ import org.eclipse.jetty.server.Connector;
|
|||
import org.eclipse.jetty.server.HttpChannel;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.HttpInput;
|
||||
import org.eclipse.jetty.server.HttpTransport;
|
||||
import org.eclipse.jetty.server.handler.ContextHandler;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class HttpChannelOverHTTP3 extends HttpChannel
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP3.class);
|
||||
|
||||
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
|
||||
private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION);
|
||||
|
||||
private final AutoLock lock = new AutoLock();
|
||||
private final HTTP3Stream stream;
|
||||
private final ServerHTTP3StreamConnection connection;
|
||||
private HttpInput.Content content;
|
||||
private boolean expect100Continue;
|
||||
private boolean delayedUntilContent;
|
||||
|
||||
public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HTTP3Stream stream, ServerHTTP3StreamConnection connection)
|
||||
public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP3 transport, HTTP3Stream stream, ServerHTTP3StreamConnection connection)
|
||||
{
|
||||
super(connector, configuration, endPoint, transport);
|
||||
this.stream = stream;
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpTransportOverHTTP3 getHttpTransport()
|
||||
{
|
||||
return (HttpTransportOverHTTP3)super.getHttpTransport();
|
||||
}
|
||||
|
||||
void consumeInput()
|
||||
{
|
||||
getRequest().getHttpInput().consumeAll();
|
||||
|
@ -175,15 +181,17 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
if (reset)
|
||||
consumeInput();
|
||||
|
||||
//TODO
|
||||
// getHttpTransport().onStreamTimeout(failure);
|
||||
getHttpTransport().onStreamIdleTimeout(failure);
|
||||
|
||||
failure.addSuppressed(new Throwable("idle timeout"));
|
||||
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
if (content == null)
|
||||
content = new HttpInput.ErrorContent(failure);
|
||||
}
|
||||
|
||||
failure.addSuppressed(new Throwable("HttpInput idle timeout"));
|
||||
// TODO: writing to the content field here is at race with demand?
|
||||
if (content == null)
|
||||
content = new HttpInput.ErrorContent(failure);
|
||||
boolean needed = getRequest().getHttpInput().onContentProducible();
|
||||
|
||||
if (needed || delayed)
|
||||
{
|
||||
consumer.accept(this::handleWithContext);
|
||||
|
@ -214,9 +222,11 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
@Override
|
||||
public boolean needContent()
|
||||
{
|
||||
if (content != null)
|
||||
return true;
|
||||
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
if (content != null)
|
||||
return true;
|
||||
}
|
||||
stream.demand();
|
||||
return false;
|
||||
}
|
||||
|
@ -224,14 +234,18 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
@Override
|
||||
public HttpInput.Content produceContent()
|
||||
{
|
||||
if (content != null)
|
||||
HttpInput.Content result;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
HttpInput.Content result = content;
|
||||
if (!result.isSpecial())
|
||||
content = null;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("produced content {} on {}", result, this);
|
||||
return result;
|
||||
if (content != null)
|
||||
{
|
||||
result = content;
|
||||
if (!result.isSpecial())
|
||||
content = null;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("produced content {} on {}", result, this);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
Stream.Data data = stream.readData();
|
||||
|
@ -240,7 +254,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
if (data == null)
|
||||
return null;
|
||||
|
||||
content = new HttpInput.Content(data.getByteBuffer())
|
||||
result = new HttpInput.Content(data.getByteBuffer())
|
||||
{
|
||||
@Override
|
||||
public boolean isEof()
|
||||
|
@ -260,20 +274,30 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
data.complete();
|
||||
}
|
||||
};
|
||||
boolean handle = onContent(content);
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
content = result;
|
||||
}
|
||||
|
||||
boolean handle = onContent(result);
|
||||
|
||||
boolean isLast = data.isLast();
|
||||
if (isLast)
|
||||
{
|
||||
boolean handleContent = onContentComplete();
|
||||
// This will generate EOF -> must happen before onContentProducible.
|
||||
// This will generate EOF -> must happen before onContentProducible().
|
||||
boolean handleRequest = onRequestComplete();
|
||||
handle |= handleContent | handleRequest;
|
||||
}
|
||||
|
||||
HttpInput.Content result = content;
|
||||
if (!result.isSpecial())
|
||||
content = result.isEof() ? new HttpInput.EofContent() : null;
|
||||
{
|
||||
HttpInput.Content newContent = result.isEof() ? new HttpInput.EofContent() : null;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
content = newContent;
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("produced new content {} on {}", result, this);
|
||||
return result;
|
||||
|
@ -294,21 +318,20 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
@Override
|
||||
protected boolean eof()
|
||||
{
|
||||
HttpInput.Content content = this.content;
|
||||
if (content == null)
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
this.content = new HttpInput.EofContent();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!content.isEof())
|
||||
if (content == null)
|
||||
{
|
||||
content = new HttpInput.EofContent();
|
||||
}
|
||||
else if (!content.isEof())
|
||||
{
|
||||
if (content.remaining() == 0)
|
||||
this.content = new HttpInput.EofContent();
|
||||
content = new HttpInput.EofContent();
|
||||
else
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -278,6 +278,11 @@ public class HttpTransportOverHTTP3 implements HttpTransport
|
|||
}
|
||||
}
|
||||
|
||||
boolean onStreamIdleTimeout(Throwable failure)
|
||||
{
|
||||
return transportCallback.idleTimeout(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(Throwable failure)
|
||||
{
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
|
||||
package org.eclipse.jetty.http3.server.internal;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Stream;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3StreamConnection;
|
||||
|
@ -20,7 +22,6 @@ import org.eclipse.jetty.http3.internal.parser.MessageParser;
|
|||
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.HttpTransport;
|
||||
|
||||
public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
|
||||
{
|
||||
|
@ -44,7 +45,7 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
|
|||
|
||||
public Runnable onRequest(HTTP3Stream stream, HeadersFrame frame)
|
||||
{
|
||||
HttpTransport transport = new HttpTransportOverHTTP3(stream);
|
||||
HttpTransportOverHTTP3 transport = new HttpTransportOverHTTP3(stream);
|
||||
HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this);
|
||||
stream.setAttachment(channel);
|
||||
return channel.onRequest(frame);
|
||||
|
@ -62,10 +63,10 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
|
|||
return channel.onTrailer(frame);
|
||||
}
|
||||
|
||||
public boolean onIdleTimeout(HTTP3Stream stream, Throwable failure)
|
||||
public boolean onIdleTimeout(HTTP3Stream stream, Throwable failure, Consumer<Runnable> consumer)
|
||||
{
|
||||
HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment();
|
||||
return channel.onIdleTimeout(failure, null); // TODO
|
||||
return channel.onIdleTimeout(failure, consumer);
|
||||
}
|
||||
|
||||
public void onFailure(HTTP3Stream stream, Throwable failure)
|
||||
|
|
|
@ -385,7 +385,7 @@ public class ClientServerTest extends AbstractClientServerTest
|
|||
});
|
||||
AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
|
||||
assertNotNull(h3);
|
||||
h3.getConfiguration().setMaxResponseHeadersSize(maxResponseHeadersSize);
|
||||
h3.getHTTP3Configuration().setMaxResponseHeadersSize(maxResponseHeadersSize);
|
||||
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener() {});
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
|
|||
});
|
||||
AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
|
||||
assertNotNull(h3);
|
||||
h3.getConfiguration().setStreamIdleTimeout(idleTimeout);
|
||||
h3.getHTTP3Configuration().setStreamIdleTimeout(idleTimeout);
|
||||
|
||||
Session.Client clientSession = http3Client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
|
|
|
@ -68,9 +68,13 @@ public abstract class ProtocolSession extends ContainerLifeCycle
|
|||
strategy.produce();
|
||||
}
|
||||
|
||||
public void offer(Runnable task)
|
||||
public void offer(Runnable task, boolean dispatch)
|
||||
{
|
||||
producer.offer(task);
|
||||
if (dispatch)
|
||||
strategy.dispatch();
|
||||
else
|
||||
strategy.produce();
|
||||
}
|
||||
|
||||
public QuicStreamEndPoint getStreamEndPoint(long streamId)
|
||||
|
|
|
@ -1000,7 +1000,7 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
|
|||
completeLatch.countDown();
|
||||
});
|
||||
|
||||
assertTrue(completeLatch.await(2 * connectTimeout, TimeUnit.SECONDS));
|
||||
assertTrue(completeLatch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
|
||||
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
|
@ -1203,7 +1203,7 @@ public class HttpClientStreamTest extends AbstractTest<TransportScenario>
|
|||
}
|
||||
});
|
||||
long idleTimeout = 1000;
|
||||
scenario.setServerIdleTimeout(idleTimeout);
|
||||
scenario.setRequestIdleTimeout(idleTimeout);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
byte[] bytes = "[{\"key\":\"value\"}]".getBytes(StandardCharsets.UTF_8);
|
||||
|
|
|
@ -172,7 +172,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
|
|||
}
|
||||
});
|
||||
long idleTimeout = 1000;
|
||||
scenario.setServerIdleTimeout(idleTimeout);
|
||||
scenario.setRequestIdleTimeout(idleTimeout);
|
||||
|
||||
CountDownLatch resultLatch = new CountDownLatch(2);
|
||||
AsyncRequestContent content = new AsyncRequestContent();
|
||||
|
@ -236,7 +236,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
|
|||
}
|
||||
});
|
||||
long idleTimeout = 2500;
|
||||
scenario.setServerIdleTimeout(idleTimeout);
|
||||
scenario.setRequestIdleTimeout(idleTimeout);
|
||||
|
||||
AsyncRequestContent content = new AsyncRequestContent(ByteBuffer.allocate(1));
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
|
@ -292,7 +292,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
|
|||
}
|
||||
});
|
||||
long idleTimeout = 2500;
|
||||
scenario.setServerIdleTimeout(idleTimeout);
|
||||
scenario.setRequestIdleTimeout(idleTimeout);
|
||||
|
||||
BlockingQueue<Callback> callbacks = new LinkedBlockingQueue<>();
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
|
@ -442,7 +442,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
|
|||
scenario.httpConfig.setIdleTimeout(httpIdleTimeout);
|
||||
CountDownLatch handlerLatch = new CountDownLatch(1);
|
||||
scenario.start(new BlockingReadHandler(handlerLatch));
|
||||
scenario.setServerIdleTimeout(idleTimeout);
|
||||
scenario.setRequestIdleTimeout(idleTimeout);
|
||||
|
||||
try (StacklessLogging ignore = new StacklessLogging(HttpChannel.class))
|
||||
{
|
||||
|
@ -510,7 +510,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
|
|||
});
|
||||
}
|
||||
});
|
||||
scenario.setServerIdleTimeout(idleTimeout);
|
||||
scenario.setRequestIdleTimeout(idleTimeout);
|
||||
|
||||
AsyncRequestContent content = new AsyncRequestContent(ByteBuffer.allocate(1));
|
||||
CountDownLatch resultLatch = new CountDownLatch(1);
|
||||
|
@ -551,7 +551,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
|
|||
}
|
||||
}
|
||||
});
|
||||
scenario.setServerIdleTimeout(idleTimeout);
|
||||
scenario.setRequestIdleTimeout(idleTimeout);
|
||||
|
||||
byte[] data = new byte[1024];
|
||||
new Random().nextBytes(data);
|
||||
|
|
|
@ -38,10 +38,12 @@ import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
|
|||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.http3.client.HTTP3Client;
|
||||
import org.eclipse.jetty.http3.client.http.HttpClientTransportOverHTTP3;
|
||||
import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory;
|
||||
import org.eclipse.jetty.http3.server.HTTP3ServerConnectionFactory;
|
||||
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
|
||||
import org.eclipse.jetty.io.ClientConnector;
|
||||
import org.eclipse.jetty.jmx.MBeanContainer;
|
||||
import org.eclipse.jetty.quic.server.QuicServerConnector;
|
||||
import org.eclipse.jetty.server.AbstractConnector;
|
||||
import org.eclipse.jetty.server.ConnectionFactory;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
|
@ -259,20 +261,35 @@ public class TransportScenario
|
|||
((AbstractConnector)connector).setIdleTimeout(idleTimeout);
|
||||
}
|
||||
|
||||
public void setServerIdleTimeout(long idleTimeout)
|
||||
public void setRequestIdleTimeout(long idleTimeout)
|
||||
{
|
||||
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
|
||||
if (h2 != null)
|
||||
{
|
||||
h2.setStreamIdleTimeout(idleTimeout);
|
||||
}
|
||||
else
|
||||
setConnectionIdleTimeout(idleTimeout);
|
||||
{
|
||||
AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
|
||||
if (h3 != null)
|
||||
h3.getHTTP3Configuration().setStreamIdleTimeout(idleTimeout);
|
||||
else
|
||||
setConnectionIdleTimeout(idleTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
|
||||
{
|
||||
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
|
||||
if (h2 != null)
|
||||
{
|
||||
h2.setMaxConcurrentStreams(maxRequestsPerConnection);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (connector instanceof QuicServerConnector)
|
||||
((QuicServerConnector)connector).getQuicConfiguration().setMaxBidirectionalRemoteStreams(maxRequestsPerConnection);
|
||||
}
|
||||
}
|
||||
|
||||
public void start(Handler handler) throws Exception
|
||||
|
|
Loading…
Reference in New Issue