Fixed handling of DATA frames arriving _before_ the upgrade.
Now CONNECT with :protocol requests will demand DATA frames only after the upgrade. Other requests will demand DATA frames during the handling of the request HEADERS frame. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
5e695919d9
commit
c6ec9fb4d2
|
@ -1101,7 +1101,7 @@ public class HttpClient extends ContainerLifeCycle
|
|||
return port == 80;
|
||||
}
|
||||
|
||||
static boolean isSchemeSecure(String scheme)
|
||||
public static boolean isSchemeSecure(String scheme)
|
||||
{
|
||||
return HttpScheme.HTTPS.is(scheme) || HttpScheme.WSS.is(scheme);
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.stream.Collectors;
|
|||
import org.eclipse.jetty.alpn.client.ALPNClientConnection;
|
||||
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
|
||||
import org.eclipse.jetty.client.AbstractConnectorHttpClientTransport;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpClientTransport;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.HttpRequest;
|
||||
|
@ -36,7 +37,6 @@ import org.eclipse.jetty.client.MultiplexConnectionPool;
|
|||
import org.eclipse.jetty.client.MultiplexHttpDestination;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
|
||||
import org.eclipse.jetty.http.HttpScheme;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.io.ClientConnectionFactory;
|
||||
import org.eclipse.jetty.io.ClientConnector;
|
||||
|
@ -121,7 +121,7 @@ public class HttpClientTransportDynamic extends AbstractConnectorHttpClientTrans
|
|||
@Override
|
||||
public HttpDestination.Key newDestinationKey(HttpRequest request, Origin origin)
|
||||
{
|
||||
boolean ssl = HttpScheme.HTTPS.is(request.getScheme());
|
||||
boolean ssl = HttpClient.isSchemeSecure(request.getScheme());
|
||||
String http1 = "http/1.1";
|
||||
String http2 = ssl ? "h2" : "h2c";
|
||||
List<String> protocols = List.of();
|
||||
|
|
|
@ -372,16 +372,18 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
dataProcess = proceed = dataDemand > 0;
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} data processing of {} for {}", initial ? "Starting" : proceed ? "Proceeding" : "Stalling", frame, this);
|
||||
if (initial)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Starting data processing of {} for {}", frame, this);
|
||||
notifyBeforeData(this);
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
dataProcess = proceed = dataDemand > 0;
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} data processing of {} for {}", proceed ? "Proceeding" : "Stalling", frame, this);
|
||||
if (proceed)
|
||||
processData();
|
||||
}
|
||||
|
|
|
@ -102,6 +102,15 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBeforeData(Stream stream)
|
||||
{
|
||||
// Do not notify DATA frame listeners until demanded.
|
||||
// This allows CONNECT requests with pseudo header :protocol
|
||||
// (e.g. WebSocket over HTTP/2) to buffer DATA frames
|
||||
// until they upgrade and are ready to process them.
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleTimeout(Session session)
|
||||
{
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.eclipse.jetty.http.HttpFields;
|
|||
import org.eclipse.jetty.http.HttpGenerator;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http.PreEncodedHttpField;
|
||||
|
@ -140,8 +139,13 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
|
|||
onRequestComplete();
|
||||
}
|
||||
|
||||
boolean connect = request instanceof MetaData.ConnectRequest;
|
||||
_delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
|
||||
!endStream && !_expect100Continue && !HttpMethod.CONNECT.is(request.getMethod());
|
||||
!endStream && !_expect100Continue && !connect;
|
||||
|
||||
// Delay the demand of DATA frames for CONNECT with :protocol.
|
||||
if (!connect || request.getProtocol() == null)
|
||||
getStream().demand(1);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
|
|
|
@ -318,12 +318,15 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
{
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttachment();
|
||||
Request request = channel.getRequest();
|
||||
if (request.getHttpInput().hasContent())
|
||||
return channel.sendErrorOrAbort("Unexpected content in CONNECT request");
|
||||
Connection connection = (Connection)request.getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
|
||||
EndPoint endPoint = connection.getEndPoint();
|
||||
endPoint.upgrade(connection);
|
||||
stream.setAttachment(endPoint);
|
||||
if (request.getHttpInput().hasContent())
|
||||
return channel.sendErrorOrAbort("Unexpected content in CONNECT request");
|
||||
// Only now that we have switched the attachment,
|
||||
// we can demand DATA frames to process them.
|
||||
stream.demand(1);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,10 +18,15 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.tests;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
|
@ -54,7 +59,6 @@ import org.eclipse.jetty.websocket.server.JettyWebSocketServletFactory;
|
|||
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
@ -69,9 +73,14 @@ public class WebSocketOverHTTP2Test
|
|||
private Server server;
|
||||
private ServerConnector connector;
|
||||
private ServerConnector tlsConnector;
|
||||
private WebSocketClient wsClient;
|
||||
|
||||
@BeforeEach
|
||||
public void startServer() throws Exception
|
||||
private void startServer() throws Exception
|
||||
{
|
||||
startServer(new TestJettyWebSocketServlet());
|
||||
}
|
||||
|
||||
private void startServer(TestJettyWebSocketServlet servlet) throws Exception
|
||||
{
|
||||
QueuedThreadPool serverThreads = new QueuedThreadPool();
|
||||
serverThreads.setName("server");
|
||||
|
@ -92,29 +101,37 @@ public class WebSocketOverHTTP2Test
|
|||
HttpConnectionFactory h1s = new HttpConnectionFactory(httpsConfig);
|
||||
HTTP2ServerConnectionFactory h2s = new HTTP2ServerConnectionFactory(httpsConfig);
|
||||
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory();
|
||||
alpn.setDefaultProtocol(h1c.getProtocol());
|
||||
alpn.setDefaultProtocol(h1s.getProtocol());
|
||||
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol());
|
||||
tlsConnector = new ServerConnector(server, 1, 1, ssl, alpn, h2s, h1s);
|
||||
tlsConnector = new ServerConnector(server, 1, 1, ssl, alpn, h1s, h2s);
|
||||
server.addConnector(tlsConnector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler(server, "/");
|
||||
context.addServlet(new ServletHolder(new JettyWebSocketServlet()
|
||||
{
|
||||
@Override
|
||||
protected void configure(JettyWebSocketServletFactory factory)
|
||||
{
|
||||
factory.addMapping("/ws/echo", (req, resp) -> new EchoSocket());
|
||||
}
|
||||
}), "/ws/*");
|
||||
context.addServlet(new ServletHolder(servlet), "/ws/*");
|
||||
JettyWebSocketServletContainerInitializer.initialize(context);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
private void startClient(Function<ClientConnector, ClientConnectionFactory.Info> protocolFn) throws Exception
|
||||
{
|
||||
ClientConnector clientConnector = new ClientConnector();
|
||||
clientConnector.setSslContextFactory(new SslContextFactory.Client(true));
|
||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||
clientThreads.setName("client");
|
||||
clientConnector.setExecutor(clientThreads);
|
||||
HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector, protocolFn.apply(clientConnector)));
|
||||
wsClient = new WebSocketClient(httpClient);
|
||||
wsClient.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopServer() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
if (server != null)
|
||||
server.stop();
|
||||
if (wsClient != null)
|
||||
wsClient.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -131,14 +148,8 @@ public class WebSocketOverHTTP2Test
|
|||
|
||||
private void testWebSocketOverDynamicTransport(Function<ClientConnector, ClientConnectionFactory.Info> protocolFn) throws Exception
|
||||
{
|
||||
ClientConnector clientConnector = new ClientConnector();
|
||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||
clientThreads.setName("client");
|
||||
clientConnector.setExecutor(clientThreads);
|
||||
HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector, protocolFn.apply(clientConnector)));
|
||||
|
||||
WebSocketClient wsClient = new WebSocketClient(httpClient);
|
||||
wsClient.start();
|
||||
startServer();
|
||||
startClient(protocolFn);
|
||||
|
||||
EventSocket wsEndPoint = new EventSocket();
|
||||
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/echo");
|
||||
|
@ -160,18 +171,11 @@ public class WebSocketOverHTTP2Test
|
|||
@Test
|
||||
public void testConnectProtocolDisabled() throws Exception
|
||||
{
|
||||
startServer();
|
||||
AbstractHTTP2ServerConnectionFactory h2c = connector.getBean(AbstractHTTP2ServerConnectionFactory.class);
|
||||
h2c.setConnectProtocolEnabled(false);
|
||||
|
||||
ClientConnector clientConnector = new ClientConnector();
|
||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||
clientThreads.setName("client");
|
||||
clientConnector.setExecutor(clientThreads);
|
||||
HTTP2Client http2Client = new HTTP2Client(clientConnector);
|
||||
HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector, new ClientConnectionFactoryOverHTTP2.H2C(http2Client)));
|
||||
|
||||
WebSocketClient wsClient = new WebSocketClient(httpClient);
|
||||
wsClient.start();
|
||||
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2C(new HTTP2Client(clientConnector)));
|
||||
|
||||
EventSocket wsEndPoint = new EventSocket();
|
||||
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/ws/echo");
|
||||
|
@ -182,4 +186,55 @@ public class WebSocketOverHTTP2Test
|
|||
Throwable cause = failure.getCause();
|
||||
assertThat(cause.getMessage(), containsStringIgnoringCase(ErrorCode.PROTOCOL_ERROR.name()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowWebSocketUpgradeWithHTTP2DataFramesQueued() throws Exception
|
||||
{
|
||||
startServer(new TestJettyWebSocketServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
super.service(request, response);
|
||||
// Flush the response to the client then wait before exiting
|
||||
// this method so that the client can send HTTP/2 DATA frames
|
||||
// that will be processed by the server while this method sleeps.
|
||||
response.flushBuffer();
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
startClient(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2(new HTTP2Client(clientConnector)));
|
||||
|
||||
// Connect and send immediately a message, so the message
|
||||
// arrives to the server while the server is still upgrading.
|
||||
EventSocket wsEndPoint = new EventSocket();
|
||||
URI uri = URI.create("wss://localhost:" + tlsConnector.getLocalPort() + "/ws/echo");
|
||||
Session session = wsClient.connect(wsEndPoint, uri).get(5, TimeUnit.SECONDS);
|
||||
String text = "websocket";
|
||||
session.getRemote().sendString(text);
|
||||
|
||||
String message = wsEndPoint.messageQueue.poll(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
assertEquals(text, message);
|
||||
|
||||
session.close(StatusCode.NORMAL, null);
|
||||
assertTrue(wsEndPoint.closeLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private static class TestJettyWebSocketServlet extends JettyWebSocketServlet
|
||||
{
|
||||
@Override
|
||||
protected void configure(JettyWebSocketServletFactory factory)
|
||||
{
|
||||
factory.addMapping("/ws/echo", (request, response) -> new EchoSocket());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue