Issue 5310 - Review HTTP/2 GOAWAY handling.
Reimplemented close/idle_timeout/stop/onGoAway/input_shutdown following more closely the specification. In particular, the semantic of sending a GOAWAY is now to: * stop creation of new both local and remote streams * record the last processed stream * continue processing streams that are pending This means that a GOAWAY is "graceful" in the sense that it allows for streams to be completed by applications. The semantic of stop() and idle timeout is harsher: for pending streams a RST_STREAM is sent to the other peer and they are failed locally. Added support for GOAWAY with 2^31-1 lastStreamId. Added support for a peer to send and receive multiple GOAWAY frames. Reviewed the stream creation/destruction mechanism so that when the last stream completes after a GOAWAY, proper actions can be run to tear down the connection. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
044052d717
commit
226d616a8a
|
@ -92,20 +92,6 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getMessagesIn()
|
|
||||||
{
|
|
||||||
HTTP2ClientSession session = (HTTP2ClientSession)getSession();
|
|
||||||
return session.getStreamsOpened();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getMessagesOut()
|
|
||||||
{
|
|
||||||
HTTP2ClientSession session = (HTTP2ClientSession)getSession();
|
|
||||||
return session.getStreamsClosed();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onOpen()
|
public void onOpen()
|
||||||
{
|
{
|
||||||
|
|
|
@ -136,7 +136,7 @@ public class AsyncServletTest extends AbstractTest
|
||||||
HeadersFrame frame = new HeadersFrame(metaData, null, true);
|
HeadersFrame frame = new HeadersFrame(metaData, null, true);
|
||||||
FuturePromise<Stream> promise = new FuturePromise<>();
|
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||||
CountDownLatch resetLatch = new CountDownLatch(1);
|
CountDownLatch failLatch = new CountDownLatch(1);
|
||||||
session.newStream(frame, promise, new Stream.Listener.Adapter()
|
session.newStream(frame, promise, new Stream.Listener.Adapter()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -146,9 +146,10 @@ public class AsyncServletTest extends AbstractTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReset(Stream stream, ResetFrame frame)
|
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
|
||||||
{
|
{
|
||||||
resetLatch.countDown();
|
failLatch.countDown();
|
||||||
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||||
|
@ -156,7 +157,7 @@ public class AsyncServletTest extends AbstractTest
|
||||||
|
|
||||||
assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||||
assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS));
|
assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS));
|
||||||
assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
assertTrue(failLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -731,6 +731,7 @@ public abstract class FlowControlStrategyTest
|
||||||
public void testClientExceedingSessionWindow() throws Exception
|
public void testClientExceedingSessionWindow() throws Exception
|
||||||
{
|
{
|
||||||
// On server, we don't consume the data.
|
// On server, we don't consume the data.
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
start(new ServerSessionListener.Adapter()
|
start(new ServerSessionListener.Adapter()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -745,16 +746,29 @@ public abstract class FlowControlStrategyTest
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
CountDownLatch closeLatch = new CountDownLatch(1);
|
|
||||||
Session session = newClient(new Session.Listener.Adapter()
|
|
||||||
{
|
|
||||||
@Override
|
@Override
|
||||||
public void onClose(Session session, GoAwayFrame frame)
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session session = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
{
|
{
|
||||||
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
|
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
|
||||||
closeLatch.countDown();
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -800,13 +814,16 @@ public abstract class FlowControlStrategyTest
|
||||||
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0]));
|
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0]));
|
||||||
|
|
||||||
// Expect the connection to be closed.
|
// Expect the connection to be closed.
|
||||||
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClientExceedingStreamWindow() throws Exception
|
public void testClientExceedingStreamWindow() throws Exception
|
||||||
{
|
{
|
||||||
// On server, we don't consume the data.
|
// On server, we don't consume the data.
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
start(new ServerSessionListener.Adapter()
|
start(new ServerSessionListener.Adapter()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -829,16 +846,29 @@ public abstract class FlowControlStrategyTest
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
CountDownLatch closeLatch = new CountDownLatch(1);
|
|
||||||
Session session = newClient(new Session.Listener.Adapter()
|
|
||||||
{
|
|
||||||
@Override
|
@Override
|
||||||
public void onClose(Session session, GoAwayFrame frame)
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session session = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
{
|
{
|
||||||
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
|
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
|
||||||
closeLatch.countDown();
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -880,7 +910,9 @@ public abstract class FlowControlStrategyTest
|
||||||
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0]));
|
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0]));
|
||||||
|
|
||||||
// Expect the connection to be closed.
|
// Expect the connection to be closed.
|
||||||
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -0,0 +1,963 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.http2.client;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.http.HttpFields;
|
||||||
|
import org.eclipse.jetty.http.HttpMethod;
|
||||||
|
import org.eclipse.jetty.http.HttpStatus;
|
||||||
|
import org.eclipse.jetty.http.HttpVersion;
|
||||||
|
import org.eclipse.jetty.http.MetaData;
|
||||||
|
import org.eclipse.jetty.http2.CloseState;
|
||||||
|
import org.eclipse.jetty.http2.ErrorCode;
|
||||||
|
import org.eclipse.jetty.http2.HTTP2Session;
|
||||||
|
import org.eclipse.jetty.http2.api.Session;
|
||||||
|
import org.eclipse.jetty.http2.api.Stream;
|
||||||
|
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||||
|
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.GoAwayFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||||
|
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||||
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
import org.eclipse.jetty.util.FuturePromise;
|
||||||
|
import org.eclipse.jetty.util.Promise;
|
||||||
|
import org.eclipse.jetty.util.component.LifeCycle;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
public class GoAwayTest extends AbstractTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testClientGoAwayServerReplies() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch serverLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(stream.getSession());
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||||
|
if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200)
|
||||||
|
clientSession.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assertions.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertSame(CloseState.CLOSED, ((HTTP2Session)serverSessionRef.get()).getCloseState());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
Assertions.assertSame(CloseState.CLOSED, ((HTTP2Session)clientSession).getCloseState());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerGoAwayWithInFlightStreamClientFailsStream() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(stream.getSession());
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
MetaData.Request request1 = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
CountDownLatch streamFailureLatch = new CountDownLatch(1);
|
||||||
|
clientSession.newStream(new HeadersFrame(request1, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
// Simulate the server closing while the client sends a second request.
|
||||||
|
// The server sends a lastStreamId for the first request, and discards the second.
|
||||||
|
serverSessionRef.get().close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
|
||||||
|
// The client sends the second request and should eventually fail it
|
||||||
|
// locally since it has a larger streamId, and the server discarded it.
|
||||||
|
MetaData.Request request2 = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
clientSession.newStream(new HeadersFrame(request2, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
|
||||||
|
{
|
||||||
|
streamFailureLatch.countDown();
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
// Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerGracefulGoAway() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(stream.getSession());
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (!frame.isGraceful())
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.isGraceful())
|
||||||
|
clientGracefulGoAwayLatch.countDown();
|
||||||
|
else
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (!frame.isGraceful())
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||||
|
if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200)
|
||||||
|
clientLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Send a graceful GOAWAY from the server.
|
||||||
|
// Because the server had no pending streams, it will send also a non-graceful GOAWAY.
|
||||||
|
((HTTP2Session)serverSessionRef.get()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||||
|
|
||||||
|
Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerGracefulGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
serverStreamRef.set(stream);
|
||||||
|
Session session = stream.getSession();
|
||||||
|
serverSessionRef.set(session);
|
||||||
|
|
||||||
|
// Send a graceful GOAWAY while processing a stream.
|
||||||
|
((HTTP2Session)session).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (!frame.isGraceful())
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.isGraceful())
|
||||||
|
clientGracefulGoAwayLatch.countDown();
|
||||||
|
else
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (!frame.isGraceful())
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||||
|
if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200)
|
||||||
|
clientLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the graceful GOAWAY.
|
||||||
|
Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Now the client cannot create new streams.
|
||||||
|
FuturePromise<Stream> streamPromise = new FuturePromise<>();
|
||||||
|
clientSession.newStream(new HeadersFrame(newRequest(HttpMethod.GET.asString(), new HttpFields()), null, true), streamPromise, null);
|
||||||
|
Assertions.assertThrows(ExecutionException.class, () -> streamPromise.get(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// The client must not reply to a graceful GOAWAY.
|
||||||
|
Assertions.assertFalse(serverGoAwayLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Previous streams must complete successfully.
|
||||||
|
Stream serverStream = serverStreamRef.get();
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP);
|
||||||
|
|
||||||
|
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// The server should have sent the GOAWAY after the last stream completed.
|
||||||
|
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverStreamLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
serverStreamRef.set(stream);
|
||||||
|
serverStreamLatch.countDown();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||||
|
if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200)
|
||||||
|
clientLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assertions.assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// The client sends a GOAWAY.
|
||||||
|
clientSession.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
|
||||||
|
|
||||||
|
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// The client must not receive a GOAWAY until the all streams are completed.
|
||||||
|
Assertions.assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Complete the stream.
|
||||||
|
Stream serverStream = serverStreamRef.get();
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP);
|
||||||
|
|
||||||
|
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerGracefulGoAwayWithStreamsClientGoAwayServerClosesWhenLastStreamCloses() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverStreamLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
serverStreamRef.set(stream);
|
||||||
|
serverStreamLatch.countDown();
|
||||||
|
|
||||||
|
// Send a graceful GOAWAY while processing a stream.
|
||||||
|
((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.isGraceful())
|
||||||
|
{
|
||||||
|
// Send a GOAWAY when receiving a graceful GOAWAY.
|
||||||
|
session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||||
|
if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200)
|
||||||
|
clientLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// The server has a pending stream, so it does not send the non-graceful GOAWAY yet.
|
||||||
|
Assertions.assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Complete the stream, the server should send the non-graceful GOAWAY.
|
||||||
|
Stream serverStream = serverStreamRef.get();
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP);
|
||||||
|
|
||||||
|
// The server already received the client GOAWAY,
|
||||||
|
// so completing the last stream produces a close event.
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
// The client should receive the server non-graceful GOAWAY.
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientGracefulGoAwayWithStreamsServerGracefulGoAwayServerClosesWhenLastStreamCloses() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
serverStreamRef.set(stream);
|
||||||
|
return new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||||
|
{
|
||||||
|
if (frame.isEndStream())
|
||||||
|
{
|
||||||
|
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||||
|
stream.headers(new HeadersFrame(stream.getId(), response, null, true), callback);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.isGraceful())
|
||||||
|
{
|
||||||
|
// Send a graceful GOAWAY.
|
||||||
|
((HTTP2Session)session).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
serverGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.isGraceful())
|
||||||
|
clientGracefulGoAwayLatch.countDown();
|
||||||
|
else
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, false), promise, new Stream.Listener.Adapter());
|
||||||
|
Stream clientStream = promise.get(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// Send a graceful GOAWAY from the client.
|
||||||
|
((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||||
|
|
||||||
|
// The server should send a graceful GOAWAY.
|
||||||
|
Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Complete the stream.
|
||||||
|
clientStream.data(new DataFrame(clientStream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
|
||||||
|
|
||||||
|
// Both client and server should send a non-graceful GOAWAY.
|
||||||
|
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverStreamRef.get().getSession()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientShutdownServerCloses() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(session);
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter());
|
||||||
|
// Wait for the SETTINGS frames to be exchanged.
|
||||||
|
Thread.sleep(500);
|
||||||
|
|
||||||
|
((HTTP2Session)clientSession).getEndPoint().close();
|
||||||
|
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerGracefulGoAwayClientShutdownServerCloses() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onAccept(Session session)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(session);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
// Reply to the graceful GOAWAY from the server with a TCP close.
|
||||||
|
((HTTP2Session)session).getEndPoint().close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Wait for the SETTINGS frames to be exchanged.
|
||||||
|
Thread.sleep(500);
|
||||||
|
|
||||||
|
// Send a graceful GOAWAY to the client.
|
||||||
|
((HTTP2Session)serverSessionRef.get()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||||
|
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerIdleTimeout() throws Exception
|
||||||
|
{
|
||||||
|
long idleTimeout = 1000;
|
||||||
|
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onAccept(Session session)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(session);
|
||||||
|
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean onIdleTimeout(Session session)
|
||||||
|
{
|
||||||
|
serverIdleTimeoutLatch.countDown();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assertions.assertTrue(serverIdleTimeoutLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||||
|
// Server should send a GOAWAY to the client.
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
// The client replied to server's GOAWAY, but the server already closed.
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerGracefulGoAwayWithStreamsServerIdleTimeout() throws Exception
|
||||||
|
{
|
||||||
|
long idleTimeout = 1000;
|
||||||
|
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onAccept(Session session)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(session);
|
||||||
|
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
stream.setIdleTimeout(10 * idleTimeout);
|
||||||
|
// Send a graceful GOAWAY.
|
||||||
|
((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.isGraceful())
|
||||||
|
clientGracefulGoAwayLatch.countDown();
|
||||||
|
else
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
// Send request headers but not data.
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter());
|
||||||
|
|
||||||
|
Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
// Server idle timeout sends a non-graceful GOAWAY.
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientGracefulGoAwayWithStreamsServerIdleTimeout() throws Exception
|
||||||
|
{
|
||||||
|
long idleTimeout = 1000;
|
||||||
|
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverGracefulGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onAccept(Session session)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(session);
|
||||||
|
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
stream.setIdleTimeout(10 * idleTimeout);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
if (frame.isGraceful())
|
||||||
|
serverGracefulGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
CountDownLatch streamFailureLatch = new CountDownLatch(1);
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
|
||||||
|
{
|
||||||
|
streamFailureLatch.countDown();
|
||||||
|
callback.succeeded();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Client sends a graceful GOAWAY.
|
||||||
|
((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||||
|
|
||||||
|
Assertions.assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||||
|
Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerGoAwayWithStreamsThenStop() throws Exception
|
||||||
|
{
|
||||||
|
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||||
|
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||||
|
start(new ServerSessionListener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
|
{
|
||||||
|
serverSessionRef.set(stream.getSession());
|
||||||
|
// Don't reply, don't reset the stream, just send the GOAWAY.
|
||||||
|
stream.getSession().close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
serverCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||||
|
Session clientSession = newClient(new Session.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientGoAwayLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
clientCloseLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
|
||||||
|
CountDownLatch clientResetLatch = new CountDownLatch(1);
|
||||||
|
clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void onReset(Stream stream, ResetFrame frame)
|
||||||
|
{
|
||||||
|
clientResetLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
// Neither the client nor the server are finishing
|
||||||
|
// the pending stream, so force the stop on the server.
|
||||||
|
LifeCycle.stop(serverSessionRef.get());
|
||||||
|
|
||||||
|
// The server should reset all the pending streams.
|
||||||
|
Assertions.assertTrue(clientResetLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||||
|
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||||
|
}
|
||||||
|
}
|
|
@ -84,8 +84,8 @@ public class SessionFailureTest extends AbstractTest
|
||||||
@Override
|
@Override
|
||||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||||
{
|
{
|
||||||
// Forcibly close the connection.
|
// Forcibly shutdown the output to fail the write below.
|
||||||
((HTTP2Session)stream.getSession()).getEndPoint().close();
|
((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput();
|
||||||
// Now try to write something: it should fail.
|
// Now try to write something: it should fail.
|
||||||
stream.headers(frame, new Callback()
|
stream.headers(frame, new Callback()
|
||||||
{
|
{
|
||||||
|
|
|
@ -321,7 +321,9 @@ public class StreamCloseTest extends AbstractTest
|
||||||
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
||||||
if ("GET".equals(request.getMethod()))
|
if ("GET".equals(request.getMethod()))
|
||||||
{
|
{
|
||||||
((HTTP2Session)stream.getSession()).getEndPoint().close();
|
// Only shutdown the output, since closing the EndPoint causes a call to
|
||||||
|
// stop() on different thread which tries to concurrently fail the stream.
|
||||||
|
((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput();
|
||||||
// Try to write something to force an error.
|
// Try to write something to force an error.
|
||||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.NOOP);
|
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.NOOP);
|
||||||
}
|
}
|
||||||
|
|
|
@ -239,7 +239,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
|
||||||
{
|
{
|
||||||
Runnable task = pollTask();
|
Runnable task = pollTask();
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Dequeued task {}", task);
|
LOG.debug("Dequeued task {}", String.valueOf(task));
|
||||||
if (task != null)
|
if (task != null)
|
||||||
return task;
|
return task;
|
||||||
|
|
||||||
|
|
|
@ -365,7 +365,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
||||||
// If the failure came from within the
|
// If the failure came from within the
|
||||||
// flusher, we need to close the connection.
|
// flusher, we need to close the connection.
|
||||||
if (closed == null)
|
if (closed == null)
|
||||||
session.abort(x);
|
session.onWriteFailure(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
void terminate(Throwable cause)
|
void terminate(Throwable cause)
|
||||||
|
@ -376,7 +376,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
||||||
closed = terminated;
|
closed = terminated;
|
||||||
terminated = cause;
|
terminated = cause;
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{}", closed != null ? "Terminated" : "Terminating");
|
LOG.debug("{} {}", closed != null ? "Terminated" : "Terminating", this);
|
||||||
}
|
}
|
||||||
if (closed == null)
|
if (closed == null)
|
||||||
iterate();
|
iterate();
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -326,21 +326,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
|
length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
|
||||||
dataLength = length >= 0 ? length : Long.MIN_VALUE;
|
dataLength = length >= 0 ? length : Long.MIN_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
callback.succeeded();
|
callback.succeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onData(DataFrame frame, Callback callback)
|
private void onData(DataFrame frame, Callback callback)
|
||||||
{
|
{
|
||||||
if (getRecvWindow() < 0)
|
|
||||||
{
|
|
||||||
// It's a bad client, it does not deserve to be
|
|
||||||
// treated gently by just resetting the stream.
|
|
||||||
session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP);
|
|
||||||
callback.failed(new IOException("stream_window_exceeded"));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// SPEC: remotely closed streams must be replied with a reset.
|
// SPEC: remotely closed streams must be replied with a reset.
|
||||||
if (isRemotelyClosed())
|
if (isRemotelyClosed())
|
||||||
{
|
{
|
||||||
|
@ -381,7 +371,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
failure = new EofException("reset");
|
failure = new EofException("reset");
|
||||||
}
|
}
|
||||||
close();
|
close();
|
||||||
session.removeStream(this);
|
if (session.removeStream(this))
|
||||||
notifyReset(this, frame, callback);
|
notifyReset(this, frame, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -405,7 +395,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
||||||
failure = frame.getFailure();
|
failure = frame.getFailure();
|
||||||
}
|
}
|
||||||
close();
|
close();
|
||||||
session.removeStream(this);
|
if (session.removeStream(this))
|
||||||
notifyFailure(this, frame, callback);
|
notifyFailure(this, frame, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,8 +44,9 @@ public interface ISession extends Session
|
||||||
* <p>Removes the given {@code stream}.</p>
|
* <p>Removes the given {@code stream}.</p>
|
||||||
*
|
*
|
||||||
* @param stream the stream to remove
|
* @param stream the stream to remove
|
||||||
|
* @return whether the stream was removed
|
||||||
*/
|
*/
|
||||||
void removeStream(IStream stream);
|
boolean removeStream(IStream stream);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Sends the given list of frames to create a new {@link Stream}.</p>
|
* <p>Sends the given list of frames to create a new {@link Stream}.</p>
|
||||||
|
|
|
@ -97,8 +97,6 @@ public interface Session
|
||||||
/**
|
/**
|
||||||
* <p>Closes the session by sending a GOAWAY frame with the given error code
|
* <p>Closes the session by sending a GOAWAY frame with the given error code
|
||||||
* and payload.</p>
|
* and payload.</p>
|
||||||
* <p>The GOAWAY frame is sent only once; subsequent or concurrent attempts to
|
|
||||||
* close the session will have no effect.</p>
|
|
||||||
*
|
*
|
||||||
* @param error the error code
|
* @param error the error code
|
||||||
* @param payload an optional payload (may be null)
|
* @param payload an optional payload (may be null)
|
||||||
|
@ -197,6 +195,16 @@ public interface Session
|
||||||
*
|
*
|
||||||
* @param session the session
|
* @param session the session
|
||||||
* @param frame the GOAWAY frame received
|
* @param frame the GOAWAY frame received
|
||||||
|
*/
|
||||||
|
default void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Callback method invoked when a GOAWAY frame caused the session to be closed.</p>
|
||||||
|
*
|
||||||
|
* @param session the session
|
||||||
|
* @param frame the GOAWAY frame that caused the session to be closed
|
||||||
* @param callback the callback to notify of the GOAWAY processing
|
* @param callback the callback to notify of the GOAWAY processing
|
||||||
*/
|
*/
|
||||||
default void onClose(Session session, GoAwayFrame frame, Callback callback)
|
default void onClose(Session session, GoAwayFrame frame, Callback callback)
|
||||||
|
|
|
@ -20,30 +20,33 @@ package org.eclipse.jetty.http2.frames;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
import org.eclipse.jetty.http2.CloseState;
|
|
||||||
import org.eclipse.jetty.http2.ErrorCode;
|
import org.eclipse.jetty.http2.ErrorCode;
|
||||||
|
|
||||||
public class GoAwayFrame extends Frame
|
public class GoAwayFrame extends Frame
|
||||||
{
|
{
|
||||||
private final CloseState closeState;
|
public static final GoAwayFrame GRACEFUL = new GoAwayFrame(Integer.MAX_VALUE, ErrorCode.NO_ERROR.code, new byte[]{'g', 'r', 'a', 'c', 'e', 'f', 'u', 'l'});
|
||||||
|
|
||||||
private final int lastStreamId;
|
private final int lastStreamId;
|
||||||
private final int error;
|
private final int error;
|
||||||
private final byte[] payload;
|
private final byte[] payload;
|
||||||
|
|
||||||
public GoAwayFrame(int lastStreamId, int error, byte[] payload)
|
public GoAwayFrame(int lastStreamId, int error, byte[] payload)
|
||||||
{
|
|
||||||
this(CloseState.REMOTELY_CLOSED, lastStreamId, error, payload);
|
|
||||||
}
|
|
||||||
|
|
||||||
public GoAwayFrame(CloseState closeState, int lastStreamId, int error, byte[] payload)
|
|
||||||
{
|
{
|
||||||
super(FrameType.GO_AWAY);
|
super(FrameType.GO_AWAY);
|
||||||
this.closeState = closeState;
|
|
||||||
this.lastStreamId = lastStreamId;
|
this.lastStreamId = lastStreamId;
|
||||||
this.error = error;
|
this.error = error;
|
||||||
this.payload = payload;
|
this.payload = payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether this GOAWAY frame is graceful, i.e. its {@code lastStreamId == Integer.MAX_VALUE}
|
||||||
|
*/
|
||||||
|
public boolean isGraceful()
|
||||||
|
{
|
||||||
|
// SPEC: section 6.8.
|
||||||
|
return lastStreamId == Integer.MAX_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
public int getLastStreamId()
|
public int getLastStreamId()
|
||||||
{
|
{
|
||||||
return lastStreamId;
|
return lastStreamId;
|
||||||
|
@ -76,11 +79,10 @@ public class GoAwayFrame extends Frame
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s,%d/%s/%s/%s",
|
return String.format("%s{%d/%s/%s}",
|
||||||
super.toString(),
|
super.toString(),
|
||||||
lastStreamId,
|
lastStreamId,
|
||||||
ErrorCode.toString(error, null),
|
ErrorCode.toString(error, null),
|
||||||
tryConvertPayload(),
|
tryConvertPayload());
|
||||||
closeState);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ public class AbstractTest
|
||||||
{
|
{
|
||||||
protected Server server;
|
protected Server server;
|
||||||
protected ServerConnector connector;
|
protected ServerConnector connector;
|
||||||
|
protected HTTP2Client http2Client;
|
||||||
protected HttpClient client;
|
protected HttpClient client;
|
||||||
|
|
||||||
protected void start(ServerSessionListener listener) throws Exception
|
protected void start(ServerSessionListener listener) throws Exception
|
||||||
|
@ -63,12 +64,13 @@ public class AbstractTest
|
||||||
server.addConnector(connector);
|
server.addConnector(connector);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void prepareClient() throws Exception
|
protected void prepareClient()
|
||||||
{
|
{
|
||||||
client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client()), null);
|
http2Client = new HTTP2Client();
|
||||||
|
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client), null);
|
||||||
QueuedThreadPool clientExecutor = new QueuedThreadPool();
|
QueuedThreadPool clientExecutor = new QueuedThreadPool();
|
||||||
clientExecutor.setName("client");
|
clientExecutor.setName("client");
|
||||||
client.setExecutor(clientExecutor);
|
this.client.setExecutor(clientExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
|
|
@ -220,7 +220,9 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
|
||||||
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
||||||
if (HttpMethod.HEAD.is(request.getMethod()))
|
if (HttpMethod.HEAD.is(request.getMethod()))
|
||||||
{
|
{
|
||||||
stream.getSession().close(ErrorCode.REFUSED_STREAM_ERROR.code, null, Callback.NOOP);
|
int error = ErrorCode.REFUSED_STREAM_ERROR.code;
|
||||||
|
stream.reset(new ResetFrame(stream.getId(), error), Callback.NOOP);
|
||||||
|
stream.getSession().close(error, null, Callback.NOOP);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -104,6 +104,12 @@ public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnecti
|
||||||
delegate.onReset(session, frame);
|
delegate.onReset(session, frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onGoAway(Session session, GoAwayFrame frame)
|
||||||
|
{
|
||||||
|
delegate.onGoAway(session, frame);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onClose(Session session, GoAwayFrame frame)
|
public void onClose(Session session, GoAwayFrame frame)
|
||||||
{
|
{
|
||||||
|
|
|
@ -503,6 +503,6 @@ public abstract class IteratingCallback implements Callback
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s[%s]", super.toString(), _state);
|
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), _state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue