* Fixes #8678 - Jetty client receives GO_AWAY and continue to send traffic on same connection * Now upon receiving the GOAWAY, the connection is removed from the pool, so it cannot be used by new requests. * HTTP2Session.removeStream() now happens _after_ notifying HEADERS and DATA events, although the Stream state change still happens before. This is necessary to avoid that a "close" event is notified before a "headers" or "data" event. With these changes, the race window of a client acquiring a connection while the server is closing it is reduced, but it is impossible to close it completely. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
12041da72f
commit
df265e0abb
|
@ -77,6 +77,11 @@
|
|||
<artifactId>http2-server</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -58,9 +58,10 @@ public class HTTP2ClientSession extends HTTP2Session
|
|||
else
|
||||
{
|
||||
stream.process(frame, Callback.NOOP);
|
||||
if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
|
||||
removeStream(stream);
|
||||
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
|
||||
notifyHeaders(stream, frame);
|
||||
if (closed)
|
||||
removeStream(stream);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
|
@ -17,6 +17,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.WritePendingException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
@ -60,8 +61,10 @@ import org.eclipse.jetty.util.Promise;
|
|||
import org.eclipse.jetty.util.component.Graceful;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
@ -506,7 +509,8 @@ public class HTTP2Test extends AbstractTest
|
|||
}
|
||||
});
|
||||
assertTrue(exchangeLatch4.await(5, TimeUnit.SECONDS));
|
||||
assertEquals(1, session.getStreams().size());
|
||||
// The stream is removed from the session just after returning from onHeaders(), so wait a little bit.
|
||||
await().atMost(Duration.ofSeconds(1)).until(() -> session.getStreams().size(), is(1));
|
||||
|
||||
// End the first stream.
|
||||
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback()
|
||||
|
|
|
@ -2139,12 +2139,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
|
||||
private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer<Throwable> failFn)
|
||||
{
|
||||
int streamId;
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
streamId = localStreamIds.getAndAdd(2);
|
||||
HTTP2Session.this.onStreamCreated(streamId);
|
||||
}
|
||||
int streamId = localStreamIds.getAndAdd(2);
|
||||
HTTP2Session.this.onStreamCreated(streamId);
|
||||
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), x ->
|
||||
{
|
||||
HTTP2Session.this.onStreamDestroyed(streamId);
|
||||
|
@ -2160,14 +2156,15 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
|
||||
private boolean newRemoteStream(int streamId)
|
||||
{
|
||||
boolean created = false;
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
switch (closed)
|
||||
{
|
||||
case NOT_CLOSED:
|
||||
{
|
||||
HTTP2Session.this.onStreamCreated(streamId);
|
||||
return true;
|
||||
created = true;
|
||||
break;
|
||||
}
|
||||
case LOCALLY_CLOSED:
|
||||
{
|
||||
|
@ -2175,15 +2172,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
if (streamId <= goAwaySent.getLastStreamId())
|
||||
{
|
||||
// Allow creation of streams that may have been in-flight.
|
||||
HTTP2Session.this.onStreamCreated(streamId);
|
||||
return true;
|
||||
created = true;
|
||||
}
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (created)
|
||||
HTTP2Session.this.onStreamCreated(streamId);
|
||||
return created;
|
||||
}
|
||||
|
||||
private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener)
|
||||
|
@ -2244,14 +2243,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
|
||||
{
|
||||
Throwable failure = null;
|
||||
boolean reserved = false;
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
// SPEC: cannot create new streams after receiving a GOAWAY.
|
||||
if (closed == CloseState.NOT_CLOSED)
|
||||
{
|
||||
if (streamId <= 0)
|
||||
{
|
||||
streamId = localStreamIds.getAndAdd(2);
|
||||
HTTP2Session.this.onStreamCreated(streamId);
|
||||
reserved = true;
|
||||
}
|
||||
slots.offer(slot);
|
||||
}
|
||||
|
@ -2263,9 +2264,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
}
|
||||
}
|
||||
if (failure == null)
|
||||
{
|
||||
if (reserved)
|
||||
HTTP2Session.this.onStreamCreated(streamId);
|
||||
return streamId;
|
||||
fail.accept(failure);
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
fail.accept(failure);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private void freeSlot(Slot slot, int streamId)
|
||||
|
|
|
@ -536,9 +536,10 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.
|
|||
dataEntry = dataQueue.poll();
|
||||
}
|
||||
DataFrame frame = dataEntry.frame;
|
||||
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
|
||||
session.removeStream(this);
|
||||
boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
|
||||
notifyDataDemanded(this, frame, dataEntry.callback);
|
||||
if (closed)
|
||||
session.removeStream(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,6 @@ public class HeadersFrame extends StreamFrame
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream,
|
||||
priority == null ? "" : String.format("+%s", priority));
|
||||
return String.format("%s#%d[end=%b,{%s},priority=%s]", super.toString(), getStreamId(), isEndStream(), getMetaData(), getPriority());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,12 +81,22 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro
|
|||
return new HttpConnectionOverHTTP2(destination, session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (failConnectionPromise(new ClosedChannelException()))
|
||||
return;
|
||||
HttpConnectionOverHTTP2 connection = getConnection();
|
||||
if (connection != null)
|
||||
connection.remove();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (failConnectionPromise(new ClosedChannelException()))
|
||||
return;
|
||||
HttpConnectionOverHTTP2 connection = this.connection.getReference();
|
||||
HttpConnectionOverHTTP2 connection = getConnection();
|
||||
if (connection != null)
|
||||
onClose(connection, frame);
|
||||
}
|
||||
|
@ -103,7 +113,7 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro
|
|||
TimeoutException failure = new TimeoutException("Idle timeout expired: " + idleTimeout + " ms");
|
||||
if (failConnectionPromise(failure))
|
||||
return true;
|
||||
HttpConnectionOverHTTP2 connection = this.connection.getReference();
|
||||
HttpConnectionOverHTTP2 connection = getConnection();
|
||||
if (connection != null)
|
||||
return connection.onIdleTimeout(idleTimeout, failure);
|
||||
return true;
|
||||
|
@ -114,7 +124,7 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro
|
|||
{
|
||||
if (failConnectionPromise(failure))
|
||||
return;
|
||||
HttpConnectionOverHTTP2 connection = this.connection.getReference();
|
||||
HttpConnectionOverHTTP2 connection = getConnection();
|
||||
if (connection != null)
|
||||
connection.close(failure);
|
||||
}
|
||||
|
@ -126,4 +136,9 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro
|
|||
httpConnectionPromise().failed(failure);
|
||||
return result;
|
||||
}
|
||||
|
||||
private HttpConnectionOverHTTP2 getConnection()
|
||||
{
|
||||
return connection.getReference();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -193,6 +193,11 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
|||
return false;
|
||||
}
|
||||
|
||||
void remove()
|
||||
{
|
||||
getHttpDestination().remove(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http2.client.http;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class GoAwayTest extends AbstractTest
|
||||
{
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testConnectionIsRemovedFromPoolOnGracefulGoAwayReceived(boolean graceful) throws Exception
|
||||
{
|
||||
long timeout = 5000;
|
||||
AtomicReference<Response> responseRef = new AtomicReference<>();
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
private Stream goAwayStream;
|
||||
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
||||
String path = request.getURI().getPath();
|
||||
|
||||
if ("/prime".equals(path))
|
||||
{
|
||||
respond(stream);
|
||||
}
|
||||
else if ("/goaway".equals(path))
|
||||
{
|
||||
try
|
||||
{
|
||||
goAwayStream = stream;
|
||||
|
||||
if (graceful)
|
||||
{
|
||||
// Send to the client a graceful GOAWAY.
|
||||
((ISession)stream.getSession()).shutdown();
|
||||
}
|
||||
else
|
||||
{
|
||||
// Send to the client a non-graceful GOAWAY.
|
||||
stream.getSession().close(ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, null, Callback.NOOP);
|
||||
}
|
||||
|
||||
// Wait for the client to receive the GOAWAY.
|
||||
Thread.sleep(1000);
|
||||
|
||||
// This request will be performed on a different connection.
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/after")
|
||||
.timeout(timeout / 2, TimeUnit.MILLISECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
responseRef.set(result.getResponse());
|
||||
responseLatch.countDown();
|
||||
});
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
}
|
||||
else if ("/after".equals(path))
|
||||
{
|
||||
// Wait for the /after request to arrive to the server
|
||||
// before answering to the /goaway request.
|
||||
// The /goaway request must succeed because it's in
|
||||
// flight and seen by the server when the GOAWAY happens,
|
||||
// so it will be completed before closing the connection.
|
||||
respond(goAwayStream);
|
||||
respond(stream);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void respond(Stream stream)
|
||||
{
|
||||
long remotePort = ((InetSocketAddress)stream.getSession().getRemoteSocketAddress()).getPort();
|
||||
HttpFields responseHeaders = HttpFields.build().putLongField("X-Remote-Port", remotePort);
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, responseHeaders);
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true));
|
||||
}
|
||||
});
|
||||
|
||||
Response response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/prime")
|
||||
.timeout(timeout, TimeUnit.MILLISECONDS)
|
||||
.send();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
long primePort = response.getHeaders().getLongField("X-Remote-Port");
|
||||
|
||||
response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/goaway")
|
||||
.timeout(timeout, TimeUnit.MILLISECONDS)
|
||||
.send();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
long goAwayPort = response.getHeaders().getLongField("X-Remote-Port");
|
||||
assertEquals(primePort, goAwayPort);
|
||||
|
||||
assertTrue(responseLatch.await(timeout, TimeUnit.MILLISECONDS));
|
||||
response = responseRef.get();
|
||||
assertNotNull(response);
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
// The /after request must happen on a different port
|
||||
// because the first connection has been removed from the pool.
|
||||
long afterPort = response.getHeaders().getLongField("X-Remote-Port");
|
||||
assertNotEquals(primePort, afterPort);
|
||||
}
|
||||
}
|
|
@ -110,10 +110,11 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
|
|||
}
|
||||
|
||||
stream.process(frame, Callback.NOOP);
|
||||
if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
|
||||
removeStream(stream);
|
||||
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
|
||||
Stream.Listener listener = notifyNewStream(stream, frame);
|
||||
stream.setListener(listener);
|
||||
if (closed)
|
||||
removeStream(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -132,9 +133,10 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
|
|||
if (stream != null)
|
||||
{
|
||||
stream.process(frame, Callback.NOOP);
|
||||
if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
|
||||
removeStream(stream);
|
||||
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
|
||||
notifyHeaders(stream, frame);
|
||||
if (closed)
|
||||
removeStream(stream);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue