diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index 92e1158e9d6..df999131b73 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -24,8 +24,11 @@ import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpReceiver; import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.util.Callback; public class HttpChannelOverHTTP2 extends HttpChannel { @@ -33,6 +36,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel private final Session session; private final HttpSenderOverHTTP2 sender; private final HttpReceiverOverHTTP2 receiver; + private Stream stream; public HttpChannelOverHTTP2(HttpDestination destination, HttpConnectionOverHTTP2 connection, Session session) { @@ -65,6 +69,16 @@ public class HttpChannelOverHTTP2 extends HttpChannel return receiver; } + public Stream getStream() + { + return stream; + } + + public void setStream(Stream stream) + { + this.stream = stream; + } + @Override public void send() { @@ -79,6 +93,19 @@ public class HttpChannelOverHTTP2 extends HttpChannel connection.release(this); } + @Override + public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure) + { + boolean aborted = super.abort(exchange, requestFailure, responseFailure); + if (aborted) + { + Stream stream = getStream(); + if (stream != null) + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + } + return aborted; + } + @Override public void exchangeTerminated(HttpExchange exchange, Result result) { diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index db01d33c179..b13e68e0971 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -33,8 +33,6 @@ import org.eclipse.jetty.util.Promise; public class HttpSenderOverHTTP2 extends HttpSender { - private Stream stream; - public HttpSenderOverHTTP2(HttpChannelOverHTTP2 channel) { super(channel); @@ -59,7 +57,7 @@ public class HttpSenderOverHTTP2 extends HttpSender @Override public void succeeded(Stream stream) { - HttpSenderOverHTTP2.this.stream = stream; + getHttpChannel().setStream(stream); stream.setIdleTimeout(request.getIdleTimeout()); if (content.hasContent() && !expects100Continue(request)) @@ -95,15 +93,9 @@ public class HttpSenderOverHTTP2 extends HttpSender } else { + Stream stream = getHttpChannel().getStream(); DataFrame frame = new DataFrame(stream.getId(), content.getByteBuffer(), content.isLast()); stream.data(frame, callback); } } - - @Override - protected void reset() - { - super.reset(); - stream = null; - } } diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java index d4214dd29fa..812253992b6 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java @@ -19,8 +19,11 @@ package org.eclipse.jetty.http2.client.http; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.client.HTTP2Client; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Server; @@ -38,25 +41,38 @@ public class AbstractTest protected ServerConnector connector; protected HttpClient client; - protected void start(int maxConcurrentStreams, Handler handler) throws Exception + protected void start(ServerSessionListener listener) throws Exception + { + prepareServer(new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener)); + server.start(); + prepareClient(); + client.start(); + } + + protected void start(Handler handler) throws Exception + { + prepareServer(new HTTP2ServerConnectionFactory(new HttpConfiguration())); + server.setHandler(handler); + server.start(); + prepareClient(); + client.start(); + } + + protected void prepareServer(ConnectionFactory connectionFactory) { QueuedThreadPool serverExecutor = new QueuedThreadPool(); serverExecutor.setName("server"); server = new Server(serverExecutor); - - HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(new HttpConfiguration()); - http2.setMaxConcurrentStreams(maxConcurrentStreams); - connector = new ServerConnector(server, 1, 1, http2); + connector = new ServerConnector(server, 1, 1, connectionFactory); server.addConnector(connector); + } - server.setHandler(handler); - server.start(); - + protected void prepareClient() throws Exception + { client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client()), null); QueuedThreadPool clientExecutor = new QueuedThreadPool(); clientExecutor.setName("client"); client.setExecutor(clientExecutor); - client.start(); } @After diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java index c2334eafacc..1dcd6066d08 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java @@ -18,19 +18,32 @@ package org.eclipse.jetty.http2.client.http; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -public class HttpClientTransportOverHTTP2Test +public class HttpClientTransportOverHTTP2Test extends AbstractTest { @Test public void testPropertiesAreForwarded() throws Exception @@ -56,6 +69,83 @@ public class HttpClientTransportOverHTTP2Test Assert.assertTrue(http2Client.isStopped()); } + @Test + public void testRequestAbortSendsResetFrame() throws Exception + { + CountDownLatch resetLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + resetLatch.countDown(); + } + }; + } + }); + + try + { + client.newRequest("localhost", connector.getLocalPort()) + .onRequestCommit(request -> request.abort(new Exception("explicitly_aborted_by_test"))) + .send(); + Assert.fail(); + } + catch (ExecutionException x) + { + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testResponseAbortSendsResetFrame() throws Exception + { + CountDownLatch resetLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), metaData, null, false), new Callback() + { + @Override + public void succeeded() + { + ByteBuffer data = ByteBuffer.allocate(1024); + stream.data(new DataFrame(stream.getId(), data, false), NOOP); + } + }); + + return new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + resetLatch.countDown(); + } + }; + } + }); + + try + { + client.newRequest("localhost", connector.getLocalPort()) + .onResponseContent((response, buffer) -> response.abort(new Exception("explicitly_aborted_by_test"))) + .send(); + Assert.fail(); + } + catch (ExecutionException x) + { + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + } + } + @Ignore @Test public void testExternalServer() throws Exception diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index e4ec2963ad7..c64ed4f5a76 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -28,6 +28,9 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.Assert; @@ -35,6 +38,17 @@ import org.junit.Test; public class MaxConcurrentStreamsTest extends AbstractTest { + private void start(int maxConcurrentStreams, Handler handler) throws Exception + { + HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(new HttpConfiguration()); + http2.setMaxConcurrentStreams(maxConcurrentStreams); + prepareServer(http2); + server.setHandler(handler); + server.start(); + prepareClient(); + client.start(); + } + @Test public void testOneConcurrentStream() throws Exception {