Merged branch 'jetty-11.0.x' into 'jetty-12.0.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-11-14 16:11:52 +01:00
commit bc3aa21b92
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
8 changed files with 307 additions and 170 deletions

View File

@ -82,12 +82,23 @@ public class HTTPSessionListenerPromise implements Session.Listener, Promise<Ses
return new HttpConnectionOverHTTP2(destination, session);
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
if (!failConnectionPromise(new ClosedChannelException()))
{
HttpConnectionOverHTTP2 connection = getConnection();
if (connection != null)
connection.remove();
}
}
@Override
public void onClose(Session session, GoAwayFrame frame, Callback callback)
{
if (!failConnectionPromise(new ClosedChannelException()))
{
HttpConnectionOverHTTP2 connection = this.connection.getReference();
HttpConnectionOverHTTP2 connection = getConnection();
if (connection != null)
onClose(connection, frame);
}
@ -106,7 +117,7 @@ public class HTTPSessionListenerPromise implements Session.Listener, Promise<Ses
TimeoutException failure = new TimeoutException("Idle timeout expired: " + idleTimeout + " ms");
if (failConnectionPromise(failure))
return true;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
HttpConnectionOverHTTP2 connection = getConnection();
if (connection != null)
return connection.onIdleTimeout(idleTimeout, failure);
return true;
@ -117,7 +128,7 @@ public class HTTPSessionListenerPromise implements Session.Listener, Promise<Ses
{
if (!failConnectionPromise(failure))
{
HttpConnectionOverHTTP2 connection = this.connection.getReference();
HttpConnectionOverHTTP2 connection = getConnection();
if (connection != null)
connection.close(failure);
}
@ -131,4 +142,9 @@ public class HTTPSessionListenerPromise implements Session.Listener, Promise<Ses
httpConnectionPromise().failed(failure);
return result;
}
private HttpConnectionOverHTTP2 getConnection()
{
return connection.getReference();
}
}

View File

@ -193,6 +193,11 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
return false;
}
void remove()
{
getHttpDestination().remove(this);
}
@Override
public void close()
{

View File

@ -79,7 +79,6 @@ public class HeadersFrame extends StreamFrame
@Override
public String toString()
{
return String.format("%s#%d{end=%b}%s", super.toString(), getStreamId(), endStream,
priority == null ? "" : String.format("+%s", priority));
return String.format("%s#%d[end=%b,{%s},priority=%s]", super.toString(), getStreamId(), isEndStream(), getMetaData(), getPriority());
}
}

View File

@ -2051,12 +2051,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer<Throwable> failFn)
{
int streamId;
try (AutoLock ignored = lock.lock())
{
streamId = localStreamIds.getAndAdd(2);
HTTP2Session.this.onStreamCreated(streamId);
}
int streamId = localStreamIds.getAndAdd(2);
HTTP2Session.this.onStreamCreated(streamId);
HTTP2Stream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), x ->
{
HTTP2Session.this.onStreamDestroyed(streamId);
@ -2072,30 +2068,22 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
private boolean newRemoteStream(int streamId)
{
boolean created;
try (AutoLock ignored = lock.lock())
{
switch (closed)
created = switch (closed)
{
case NOT_CLOSED:
{
HTTP2Session.this.onStreamCreated(streamId);
return true;
}
case LOCALLY_CLOSED:
{
case NOT_CLOSED -> true;
case LOCALLY_CLOSED ->
// SPEC: streams larger than GOAWAY's lastStreamId are dropped.
if (streamId <= goAwaySent.getLastStreamId())
{
// Allow creation of streams that may have been in-flight.
HTTP2Session.this.onStreamCreated(streamId);
return true;
}
return false;
}
default:
return false;
}
// Allow creation of streams that may have been in-flight.
streamId <= goAwaySent.getLastStreamId();
default -> false;
};
}
if (created)
HTTP2Session.this.onStreamCreated(streamId);
return created;
}
private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener)
@ -2156,14 +2144,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
{
Throwable failure = null;
boolean reserved = false;
try (AutoLock ignored = lock.lock())
{
// SPEC: cannot create new streams after receiving a GOAWAY.
if (closed == CloseState.NOT_CLOSED)
{
if (streamId <= 0)
{
streamId = localStreamIds.getAndAdd(2);
HTTP2Session.this.onStreamCreated(streamId);
reserved = true;
}
slots.offer(slot);
}
@ -2175,9 +2165,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
}
}
if (failure == null)
{
if (reserved)
HTTP2Session.this.onStreamCreated(streamId);
return streamId;
fail.accept(failure);
return 0;
}
else
{
fail.accept(failure);
return 0;
}
}
private void freeSlot(Slot slot, int streamId)

View File

@ -375,9 +375,10 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
// here only handle responses and trailers.
if (metaData.isResponse() || !metaData.isRequest())
{
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
getSession().removeStream(this);
boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyHeaders(this, frame);
if (closed)
getSession().removeStream(this);
}
if (frame.isEndStream())

View File

@ -110,10 +110,11 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
}
stream.process(frame, Callback.NOOP);
if (stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
removeStream(stream);
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
if (closed)
removeStream(stream);
}
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http2.tests;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -20,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
@ -44,8 +46,17 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GoAwayTest extends AbstractTest
{
@ -95,12 +106,12 @@ public class GoAwayTest extends AbstractTest
}
});
Assertions.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertSame(CloseState.CLOSED, ((HTTP2Session)serverSessionRef.get()).getCloseState());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
Assertions.assertSame(CloseState.CLOSED, ((HTTP2Session)clientSession).getCloseState());
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertSame(CloseState.CLOSED, ((HTTP2Session)serverSessionRef.get()).getCloseState());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertSame(CloseState.CLOSED, ((HTTP2Session)clientSession).getCloseState());
}
@Test
@ -177,14 +188,14 @@ public class GoAwayTest extends AbstractTest
}
});
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
}
@Test
@ -254,20 +265,20 @@ public class GoAwayTest extends AbstractTest
}
});
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
// Send a graceful GOAWAY from the server.
// Because the server had no pending streams, it will send also a non-graceful GOAWAY.
((HTTP2Session)serverSessionRef.get()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -343,32 +354,32 @@ public class GoAwayTest extends AbstractTest
});
// Wait for the graceful GOAWAY.
Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
// Now the client cannot create new streams.
FuturePromise<Stream> streamPromise = new FuturePromise<>();
clientSession.newStream(new HeadersFrame(newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY), null, true), streamPromise, null);
Assertions.assertThrows(ExecutionException.class, () -> streamPromise.get(5, TimeUnit.SECONDS));
assertThrows(ExecutionException.class, () -> streamPromise.get(5, TimeUnit.SECONDS));
// The client must not reply to a graceful GOAWAY.
Assertions.assertFalse(serverGoAwayLatch.await(1, TimeUnit.SECONDS));
assertFalse(serverGoAwayLatch.await(1, TimeUnit.SECONDS));
// Previous streams must complete successfully.
Stream serverStream = serverStreamRef.get();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP);
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
// The server should have sent the GOAWAY after the last stream completed.
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -380,54 +391,54 @@ public class GoAwayTest extends AbstractTest
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverCloseLatch = new CountDownLatch(1);
start(new ServerSessionListener()
{
@Override
public void onAccept(Session session)
{
@Override
public void onAccept(Session session)
{
serverSessionRef.set(session);
}
serverSessionRef.set(session);
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
stream.demand();
AtomicInteger dataFrames = new AtomicInteger();
return new Stream.Listener()
{
stream.demand();
AtomicInteger dataFrames = new AtomicInteger();
return new Stream.Listener()
@Override
public void onDataAvailable(Stream stream)
{
@Override
public void onDataAvailable(Stream stream)
stream.readData();
// Do not release the Data for this stream.
// Only send the response after reading the first DATA frame.
if (dataFrames.incrementAndGet() == 1)
{
stream.readData();
// Do not release the Data for this stream.
// Only send the response after reading the first DATA frame.
if (dataFrames.incrementAndGet() == 1)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
};
}
}
};
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverGoAwayLatch.countDown();
}
@Override
public void onClose(Session session, GoAwayFrame frame, Callback callback)
{
serverCloseLatch.countDown();
callback.succeeded();
}
}, h2 ->
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
// Use the simple, predictable, strategy for window updates.
h2.setFlowControlStrategyFactory(SimpleFlowControlStrategy::new);
h2.setInitialSessionRecvWindow(flowControlWindow);
h2.setInitialStreamRecvWindow(flowControlWindow);
});
serverGoAwayLatch.countDown();
}
@Override
public void onClose(Session session, GoAwayFrame frame, Callback callback)
{
serverCloseLatch.countDown();
callback.succeeded();
}
}, h2 ->
{
// Use the simple, predictable, strategy for window updates.
h2.setFlowControlStrategyFactory(SimpleFlowControlStrategy::new);
h2.setInitialSessionRecvWindow(flowControlWindow);
h2.setInitialStreamRecvWindow(flowControlWindow);
});
CountDownLatch clientSettingsLatch = new CountDownLatch(1);
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
@ -457,7 +468,7 @@ public class GoAwayTest extends AbstractTest
// Wait for the server settings to be received by the client.
// In particular, we want to wait for the initial stream flow
// control window setting before we create the first stream below.
Assertions.assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
// This is necessary because the server session window is smaller than the
// default and the server cannot send a WINDOW_UPDATE with a negative value.
@ -495,13 +506,13 @@ public class GoAwayTest extends AbstractTest
}
});
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -566,29 +577,29 @@ public class GoAwayTest extends AbstractTest
}
});
Assertions.assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
// The client sends a GOAWAY.
clientSession.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
// The client must not receive a GOAWAY until the all streams are completed.
Assertions.assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
// Complete the stream.
Stream serverStream = serverStreamRef.get();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP);
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -659,7 +670,7 @@ public class GoAwayTest extends AbstractTest
});
// The server has a pending stream, so it does not send the non-graceful GOAWAY yet.
Assertions.assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
// Complete the stream, the server should send the non-graceful GOAWAY.
Stream serverStream = serverStreamRef.get();
@ -668,14 +679,14 @@ public class GoAwayTest extends AbstractTest
// The server already received the client GOAWAY,
// so completing the last stream produces a close event.
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
// The client should receive the server non-graceful GOAWAY.
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -759,19 +770,19 @@ public class GoAwayTest extends AbstractTest
((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
// The server should send a graceful GOAWAY.
Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
// Complete the stream.
clientStream.data(new DataFrame(clientStream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
// Both client and server should send a non-graceful GOAWAY.
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverStreamRef.get().getSession()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverStreamRef.get().getSession()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -797,8 +808,8 @@ public class GoAwayTest extends AbstractTest
((HTTP2Session)clientSession).getEndPoint().close();
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
}
@Test
@ -837,8 +848,8 @@ public class GoAwayTest extends AbstractTest
// Send a graceful GOAWAY to the client.
((HTTP2Session)serverSessionRef.get()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
}
// TODO: add a shutdown test with pending stream.
@ -894,15 +905,15 @@ public class GoAwayTest extends AbstractTest
}
});
Assertions.assertTrue(serverIdleTimeoutLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(serverIdleTimeoutLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Server should send a GOAWAY to the client.
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
// The client replied to server's GOAWAY, but the server already closed.
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -972,15 +983,15 @@ public class GoAwayTest extends AbstractTest
}
});
Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
// Server idle timeout sends a non-graceful GOAWAY.
Assertions.assertTrue(clientResetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientResetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -1054,14 +1065,14 @@ public class GoAwayTest extends AbstractTest
// Client sends a graceful GOAWAY.
((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
Assertions.assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(streamResetLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(streamResetLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@Test
@ -1118,18 +1129,121 @@ public class GoAwayTest extends AbstractTest
}
});
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
// Neither the client nor the server are finishing
// the pending stream, so force the stop on the server.
LifeCycle.stop(serverSessionRef.get());
// The server should reset all the pending streams.
Assertions.assertTrue(clientResetLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientResetLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConnectionIsRemovedFromPoolOnGracefulGoAwayReceived(boolean graceful) throws Exception
{
long timeout = 5000;
AtomicReference<Response> responseRef = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
start(new ServerSessionListener()
{
private Stream goAwayStream;
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
String path = request.getURI().getPath();
if ("/prime".equals(path))
{
respond(stream);
}
else if ("/goaway".equals(path))
{
try
{
goAwayStream = stream;
if (graceful)
{
// Send to the client a graceful GOAWAY.
stream.getSession().shutdown();
}
else
{
// Send to the client a non-graceful GOAWAY.
stream.getSession().close(ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, null, Callback.NOOP);
}
// Wait for the client to receive the GOAWAY.
Thread.sleep(1000);
// This request will be performed on a different connection.
httpClient.newRequest("localhost", connector.getLocalPort())
.path("/after")
.timeout(timeout / 2, TimeUnit.MILLISECONDS)
.send(result ->
{
responseRef.set(result.getResponse());
responseLatch.countDown();
});
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
else if ("/after".equals(path))
{
// Wait for the /after request to arrive to the server
// before answering to the /goaway request.
// The /goaway request must succeed because it's in
// flight and seen by the server when the GOAWAY happens,
// so it will be completed before closing the connection.
respond(goAwayStream);
respond(stream);
}
return null;
}
private void respond(Stream stream)
{
long remotePort = ((InetSocketAddress)stream.getSession().getRemoteSocketAddress()).getPort();
HttpFields responseHeaders = HttpFields.build().putLongField("X-Remote-Port", remotePort);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, responseHeaders);
stream.headers(new HeadersFrame(stream.getId(), response, null, true));
}
});
Response response = httpClient.newRequest("localhost", connector.getLocalPort())
.path("/prime")
.timeout(timeout, TimeUnit.MILLISECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
long primePort = response.getHeaders().getLongField("X-Remote-Port");
response = httpClient.newRequest("localhost", connector.getLocalPort())
.path("/goaway")
.timeout(timeout, TimeUnit.MILLISECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
long goAwayPort = response.getHeaders().getLongField("X-Remote-Port");
assertEquals(primePort, goAwayPort);
assertTrue(responseLatch.await(timeout, TimeUnit.MILLISECONDS));
response = responseRef.get();
assertNotNull(response);
assertEquals(HttpStatus.OK_200, response.getStatus());
// The /after request must happen on a different port
// because the first connection has been removed from the pool.
long afterPort = response.getHeaders().getLongField("X-Remote-Port");
assertNotEquals(primePort, afterPort);
}
}

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
@ -55,8 +56,10 @@ import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.Graceful;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -513,7 +516,8 @@ public class HTTP2Test extends AbstractTest
}
});
assertTrue(exchangeLatch4.await(5, TimeUnit.SECONDS));
assertEquals(1, session.getStreams().size());
// The stream is removed from the session just after returning from onHeaders(), so wait a little bit.
await().atMost(Duration.ofSeconds(1)).until(() -> session.getStreams().size(), is(1));
// End the first stream.
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback()