diff --git a/jetty-http2/http2-client/pom.xml b/jetty-http2/http2-client/pom.xml index a4d5bd55a11..6d73499a737 100644 --- a/jetty-http2/http2-client/pom.xml +++ b/jetty-http2/http2-client/pom.xml @@ -77,6 +77,11 @@ http2-server test + + org.awaitility + awaitility + test + diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java index 31eb77a94a0..a74bdc500e8 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java @@ -58,9 +58,10 @@ public class HTTP2ClientSession extends HTTP2Session else { stream.process(frame, Callback.NOOP); - if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - removeStream(stream); + boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); notifyHeaders(stream, frame); + if (closed) + removeStream(stream); } } else diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java index 919d26f4aa6..d4536b56cb1 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Random; @@ -60,8 +61,10 @@ import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.Graceful; import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -506,7 +509,8 @@ public class HTTP2Test extends AbstractTest } }); assertTrue(exchangeLatch4.await(5, TimeUnit.SECONDS)); - assertEquals(1, session.getStreams().size()); + // The stream is removed from the session just after returning from onHeaders(), so wait a little bit. + await().atMost(Duration.ofSeconds(1)).until(() -> session.getStreams().size(), is(1)); // End the first stream. stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback() diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 3860aa99f19..d120ec2464e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -2139,12 +2139,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer failFn) { - int streamId; - try (AutoLock ignored = lock.lock()) - { - streamId = localStreamIds.getAndAdd(2); - HTTP2Session.this.onStreamCreated(streamId); - } + int streamId = localStreamIds.getAndAdd(2); + HTTP2Session.this.onStreamCreated(streamId); IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), x -> { HTTP2Session.this.onStreamDestroyed(streamId); @@ -2160,14 +2156,15 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private boolean newRemoteStream(int streamId) { + boolean created = false; try (AutoLock ignored = lock.lock()) { switch (closed) { case NOT_CLOSED: { - HTTP2Session.this.onStreamCreated(streamId); - return true; + created = true; + break; } case LOCALLY_CLOSED: { @@ -2175,15 +2172,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio if (streamId <= goAwaySent.getLastStreamId()) { // Allow creation of streams that may have been in-flight. - HTTP2Session.this.onStreamCreated(streamId); - return true; + created = true; } - return false; + break; } default: - return false; + break; } } + if (created) + HTTP2Session.this.onStreamCreated(streamId); + return created; } private void push(PushPromiseFrame frame, Promise promise, Stream.Listener listener) @@ -2244,14 +2243,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private int reserveSlot(Slot slot, int streamId, Consumer fail) { Throwable failure = null; + boolean reserved = false; try (AutoLock ignored = lock.lock()) { + // SPEC: cannot create new streams after receiving a GOAWAY. if (closed == CloseState.NOT_CLOSED) { if (streamId <= 0) { streamId = localStreamIds.getAndAdd(2); - HTTP2Session.this.onStreamCreated(streamId); + reserved = true; } slots.offer(slot); } @@ -2263,9 +2264,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } if (failure == null) + { + if (reserved) + HTTP2Session.this.onStreamCreated(streamId); return streamId; - fail.accept(failure); - return 0; + } + else + { + fail.accept(failure); + return 0; + } } private void freeSlot(Slot slot, int streamId) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 526a0c487e9..fdba58e00c7 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -536,9 +536,10 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts. dataEntry = dataQueue.poll(); } DataFrame frame = dataEntry.frame; - if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - session.removeStream(this); + boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); notifyDataDemanded(this, frame, dataEntry.callback); + if (closed) + session.removeStream(this); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java index c8f2a1ac93b..5ab758cb5d9 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/HeadersFrame.java @@ -79,7 +79,6 @@ public class HeadersFrame extends StreamFrame @Override public String toString() { - return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream, - priority == null ? "" : String.format("+%s", priority)); + return String.format("%s#%d[end=%b,{%s},priority=%s]", super.toString(), getStreamId(), isEndStream(), getMetaData(), getPriority()); } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HTTPSessionListenerPromise.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HTTPSessionListenerPromise.java index c00845cd7ca..114b509c3df 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HTTPSessionListenerPromise.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HTTPSessionListenerPromise.java @@ -81,12 +81,22 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro return new HttpConnectionOverHTTP2(destination, session); } + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (failConnectionPromise(new ClosedChannelException())) + return; + HttpConnectionOverHTTP2 connection = getConnection(); + if (connection != null) + connection.remove(); + } + @Override public void onClose(Session session, GoAwayFrame frame) { if (failConnectionPromise(new ClosedChannelException())) return; - HttpConnectionOverHTTP2 connection = this.connection.getReference(); + HttpConnectionOverHTTP2 connection = getConnection(); if (connection != null) onClose(connection, frame); } @@ -103,7 +113,7 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro TimeoutException failure = new TimeoutException("Idle timeout expired: " + idleTimeout + " ms"); if (failConnectionPromise(failure)) return true; - HttpConnectionOverHTTP2 connection = this.connection.getReference(); + HttpConnectionOverHTTP2 connection = getConnection(); if (connection != null) return connection.onIdleTimeout(idleTimeout, failure); return true; @@ -114,7 +124,7 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro { if (failConnectionPromise(failure)) return; - HttpConnectionOverHTTP2 connection = this.connection.getReference(); + HttpConnectionOverHTTP2 connection = getConnection(); if (connection != null) connection.close(failure); } @@ -126,4 +136,9 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro httpConnectionPromise().failed(failure); return result; } + + private HttpConnectionOverHTTP2 getConnection() + { + return connection.getReference(); + } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index cdfd03c7545..b8fd985e536 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -193,6 +193,11 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S return false; } + void remove() + { + getHttpDestination().remove(this); + } + @Override public void close() { diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/GoAwayTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/GoAwayTest.java new file mode 100644 index 00000000000..ccd6579bbe7 --- /dev/null +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/GoAwayTest.java @@ -0,0 +1,144 @@ +// +// ======================================================================== +// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.client.http; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.ISession; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GoAwayTest extends AbstractTest +{ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConnectionIsRemovedFromPoolOnGracefulGoAwayReceived(boolean graceful) throws Exception + { + long timeout = 5000; + AtomicReference responseRef = new AtomicReference<>(); + CountDownLatch responseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + private Stream goAwayStream; + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + String path = request.getURI().getPath(); + + if ("/prime".equals(path)) + { + respond(stream); + } + else if ("/goaway".equals(path)) + { + try + { + goAwayStream = stream; + + if (graceful) + { + // Send to the client a graceful GOAWAY. + ((ISession)stream.getSession()).shutdown(); + } + else + { + // Send to the client a non-graceful GOAWAY. + stream.getSession().close(ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, null, Callback.NOOP); + } + + // Wait for the client to receive the GOAWAY. + Thread.sleep(1000); + + // This request will be performed on a different connection. + client.newRequest("localhost", connector.getLocalPort()) + .path("/after") + .timeout(timeout / 2, TimeUnit.MILLISECONDS) + .send(result -> + { + responseRef.set(result.getResponse()); + responseLatch.countDown(); + }); + } + catch (Exception x) + { + throw new RuntimeException(x); + } + } + else if ("/after".equals(path)) + { + // Wait for the /after request to arrive to the server + // before answering to the /goaway request. + // The /goaway request must succeed because it's in + // flight and seen by the server when the GOAWAY happens, + // so it will be completed before closing the connection. + respond(goAwayStream); + respond(stream); + } + return null; + } + + private void respond(Stream stream) + { + long remotePort = ((InetSocketAddress)stream.getSession().getRemoteSocketAddress()).getPort(); + HttpFields responseHeaders = HttpFields.build().putLongField("X-Remote-Port", remotePort); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, responseHeaders); + stream.headers(new HeadersFrame(stream.getId(), response, null, true)); + } + }); + + Response response = client.newRequest("localhost", connector.getLocalPort()) + .path("/prime") + .timeout(timeout, TimeUnit.MILLISECONDS) + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + long primePort = response.getHeaders().getLongField("X-Remote-Port"); + + response = client.newRequest("localhost", connector.getLocalPort()) + .path("/goaway") + .timeout(timeout, TimeUnit.MILLISECONDS) + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + long goAwayPort = response.getHeaders().getLongField("X-Remote-Port"); + assertEquals(primePort, goAwayPort); + + assertTrue(responseLatch.await(timeout, TimeUnit.MILLISECONDS)); + response = responseRef.get(); + assertNotNull(response); + assertEquals(HttpStatus.OK_200, response.getStatus()); + // The /after request must happen on a different port + // because the first connection has been removed from the pool. + long afterPort = response.getHeaders().getLongField("X-Remote-Port"); + assertNotEquals(primePort, afterPort); + } +} diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index 7d499c5f35f..2c9d87fa9c4 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -110,10 +110,11 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis } stream.process(frame, Callback.NOOP); - if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - removeStream(stream); + boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); Stream.Listener listener = notifyNewStream(stream, frame); stream.setListener(listener); + if (closed) + removeStream(stream); } } } @@ -132,9 +133,10 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis if (stream != null) { stream.process(frame, Callback.NOOP); - if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - removeStream(stream); + boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); notifyHeaders(stream, frame); + if (closed) + removeStream(stream); } else {