Merge pull request #5634 from eclipse/jetty-9.4.x-5310-http2_goaway
Review HTTP/2 GOAWAY handling
This commit is contained in:
commit
68e70b47de
|
@ -92,20 +92,6 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMessagesIn()
|
||||
{
|
||||
HTTP2ClientSession session = (HTTP2ClientSession)getSession();
|
||||
return session.getStreamsOpened();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMessagesOut()
|
||||
{
|
||||
HTTP2ClientSession session = (HTTP2ClientSession)getSession();
|
||||
return session.getStreamsClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
|
@ -125,15 +111,11 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
|
|||
ISession session = getSession();
|
||||
|
||||
int windowDelta = client.getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE;
|
||||
session.updateRecvWindow(windowDelta);
|
||||
if (windowDelta > 0)
|
||||
{
|
||||
session.updateRecvWindow(windowDelta);
|
||||
session.frames(null, Arrays.asList(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this);
|
||||
}
|
||||
else
|
||||
{
|
||||
session.frames(null, Arrays.asList(prefaceFrame, settingsFrame), this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.http2.client;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.CloseState;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
|
@ -62,7 +63,10 @@ public class HTTP2ClientSession extends HTTP2Session
|
|||
else
|
||||
{
|
||||
stream.process(frame, Callback.NOOP);
|
||||
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
|
||||
notifyHeaders(stream, frame);
|
||||
if (closed)
|
||||
removeStream(stream);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
|
@ -136,7 +136,7 @@ public class AsyncServletTest extends AbstractTest
|
|||
HeadersFrame frame = new HeadersFrame(metaData, null, true);
|
||||
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
CountDownLatch resetLatch = new CountDownLatch(1);
|
||||
CountDownLatch failLatch = new CountDownLatch(1);
|
||||
session.newStream(frame, promise, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -146,9 +146,10 @@ public class AsyncServletTest extends AbstractTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onReset(Stream stream, ResetFrame frame)
|
||||
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
|
||||
{
|
||||
resetLatch.countDown();
|
||||
failLatch.countDown();
|
||||
callback.succeeded();
|
||||
}
|
||||
});
|
||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||
|
@ -156,7 +157,7 @@ public class AsyncServletTest extends AbstractTest
|
|||
|
||||
assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS));
|
||||
assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
assertTrue(failLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -137,10 +137,10 @@ public abstract class FlowControlStrategyTest
|
|||
@Test
|
||||
public void testWindowSizeUpdates() throws Exception
|
||||
{
|
||||
final CountDownLatch prefaceLatch = new CountDownLatch(1);
|
||||
final CountDownLatch stream1Latch = new CountDownLatch(1);
|
||||
final CountDownLatch stream2Latch = new CountDownLatch(1);
|
||||
final CountDownLatch settingsLatch = new CountDownLatch(1);
|
||||
CountDownLatch prefaceLatch = new CountDownLatch(1);
|
||||
CountDownLatch stream1Latch = new CountDownLatch(1);
|
||||
CountDownLatch stream2Latch = new CountDownLatch(1);
|
||||
CountDownLatch settingsLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -233,11 +233,11 @@ public abstract class FlowControlStrategyTest
|
|||
// then we change the window to 512 B. At this point, the client
|
||||
// must stop sending data (although the initial window allows it).
|
||||
|
||||
final int size = 512;
|
||||
int size = 512;
|
||||
// We get 3 data frames: the first of 1024 and 2 of 512 each
|
||||
// after the flow control window has been reduced.
|
||||
final CountDownLatch dataLatch = new CountDownLatch(3);
|
||||
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
|
||||
CountDownLatch dataLatch = new CountDownLatch(3);
|
||||
AtomicReference<Callback> callbackRef = new AtomicReference<>();
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -276,7 +276,7 @@ public abstract class FlowControlStrategyTest
|
|||
});
|
||||
|
||||
// Two SETTINGS frames, the initial one and the one we send from the server.
|
||||
final CountDownLatch settingsLatch = new CountDownLatch(2);
|
||||
CountDownLatch settingsLatch = new CountDownLatch(2);
|
||||
Session session = newClient(new Session.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -313,9 +313,9 @@ public abstract class FlowControlStrategyTest
|
|||
@Test
|
||||
public void testServerFlowControlOneBigWrite() throws Exception
|
||||
{
|
||||
final int windowSize = 1536;
|
||||
final int length = 5 * windowSize;
|
||||
final CountDownLatch settingsLatch = new CountDownLatch(2);
|
||||
int windowSize = 1536;
|
||||
int length = 5 * windowSize;
|
||||
CountDownLatch settingsLatch = new CountDownLatch(2);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -350,13 +350,13 @@ public abstract class FlowControlStrategyTest
|
|||
|
||||
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
final Exchanger<Callback> exchanger = new Exchanger<>();
|
||||
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
Exchanger<Callback> exchanger = new Exchanger<>();
|
||||
MetaData.Request metaData = newRequest("GET", new HttpFields());
|
||||
HeadersFrame requestFrame = new HeadersFrame(metaData, null, true);
|
||||
session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||
{
|
||||
private AtomicInteger dataFrames = new AtomicInteger();
|
||||
private final AtomicInteger dataFrames = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
|
@ -407,10 +407,10 @@ public abstract class FlowControlStrategyTest
|
|||
@Test
|
||||
public void testClientFlowControlOneBigWrite() throws Exception
|
||||
{
|
||||
final int windowSize = 1536;
|
||||
final Exchanger<Callback> exchanger = new Exchanger<>();
|
||||
final CountDownLatch settingsLatch = new CountDownLatch(1);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
int windowSize = 1536;
|
||||
Exchanger<Callback> exchanger = new Exchanger<>();
|
||||
CountDownLatch settingsLatch = new CountDownLatch(1);
|
||||
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -429,7 +429,7 @@ public abstract class FlowControlStrategyTest
|
|||
stream.headers(responseFrame, Callback.NOOP);
|
||||
return new Stream.Listener.Adapter()
|
||||
{
|
||||
private AtomicInteger dataFrames = new AtomicInteger();
|
||||
private final AtomicInteger dataFrames = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
|
@ -481,7 +481,7 @@ public abstract class FlowControlStrategyTest
|
|||
session.newStream(requestFrame, streamPromise, null);
|
||||
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
|
||||
|
||||
final int length = 5 * windowSize;
|
||||
int length = 5 * windowSize;
|
||||
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
|
||||
stream.data(dataFrame, Callback.NOOP);
|
||||
|
||||
|
@ -500,7 +500,7 @@ public abstract class FlowControlStrategyTest
|
|||
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private void checkThatWeAreFlowControlStalled(Exchanger<Callback> exchanger) throws Exception
|
||||
private void checkThatWeAreFlowControlStalled(Exchanger<Callback> exchanger)
|
||||
{
|
||||
assertThrows(TimeoutException.class,
|
||||
() -> exchanger.exchange(null, 1, TimeUnit.SECONDS));
|
||||
|
@ -509,7 +509,7 @@ public abstract class FlowControlStrategyTest
|
|||
@Test
|
||||
public void testSessionStalledStallsNewStreams() throws Exception
|
||||
{
|
||||
final int windowSize = 1024;
|
||||
int windowSize = 1024;
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -544,8 +544,8 @@ public abstract class FlowControlStrategyTest
|
|||
Session session = newClient(new Session.Listener.Adapter());
|
||||
|
||||
// First request is just to consume most of the session window.
|
||||
final List<Callback> callbacks1 = new ArrayList<>();
|
||||
final CountDownLatch prepareLatch = new CountDownLatch(1);
|
||||
List<Callback> callbacks1 = new ArrayList<>();
|
||||
CountDownLatch prepareLatch = new CountDownLatch(1);
|
||||
MetaData.Request request1 = newRequest("POST", new HttpFields());
|
||||
session.newStream(new HeadersFrame(request1, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||
{
|
||||
|
@ -584,7 +584,7 @@ public abstract class FlowControlStrategyTest
|
|||
});
|
||||
|
||||
// Fourth request is now stalled.
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
MetaData.Request request4 = newRequest("GET", new HttpFields());
|
||||
session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||
{
|
||||
|
@ -613,7 +613,7 @@ public abstract class FlowControlStrategyTest
|
|||
@Test
|
||||
public void testServerSendsBigContent() throws Exception
|
||||
{
|
||||
final byte[] data = new byte[1024 * 1024];
|
||||
byte[] data = new byte[1024 * 1024];
|
||||
new Random().nextBytes(data);
|
||||
|
||||
start(new ServerSessionListener.Adapter()
|
||||
|
@ -637,8 +637,8 @@ public abstract class FlowControlStrategyTest
|
|||
Session session = newClient(new Session.Listener.Adapter());
|
||||
MetaData.Request metaData = newRequest("GET", new HttpFields());
|
||||
HeadersFrame requestFrame = new HeadersFrame(metaData, null, true);
|
||||
final byte[] bytes = new byte[data.length];
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
byte[] bytes = new byte[data.length];
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||
{
|
||||
private int received;
|
||||
|
@ -682,7 +682,7 @@ public abstract class FlowControlStrategyTest
|
|||
}
|
||||
});
|
||||
|
||||
final int initialWindow = 16;
|
||||
int initialWindow = 16;
|
||||
Session session = newClient(new Session.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -698,11 +698,11 @@ public abstract class FlowControlStrategyTest
|
|||
new Random().nextBytes(requestData);
|
||||
|
||||
byte[] responseData = new byte[requestData.length];
|
||||
final ByteBuffer responseContent = ByteBuffer.wrap(responseData);
|
||||
ByteBuffer responseContent = ByteBuffer.wrap(responseData);
|
||||
MetaData.Request metaData = newRequest("GET", new HttpFields());
|
||||
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
|
||||
Promise.Completable<Stream> completable = new Promise.Completable<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
session.newStream(requestFrame, completable, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -731,6 +731,7 @@ public abstract class FlowControlStrategyTest
|
|||
public void testClientExceedingSessionWindow() throws Exception
|
||||
{
|
||||
// On server, we don't consume the data.
|
||||
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -745,16 +746,29 @@ public abstract class FlowControlStrategyTest
|
|||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
final CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
Session session = newClient(new Session.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onClose(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverCloseLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||
Session session = newClient(new Session.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
|
||||
closeLatch.countDown();
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientCloseLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -765,7 +779,7 @@ public abstract class FlowControlStrategyTest
|
|||
session.newStream(requestFrame, Promise.from(completable), new Stream.Listener.Adapter());
|
||||
Stream stream = completable.get(5, TimeUnit.SECONDS);
|
||||
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
|
||||
{
|
||||
@Override
|
||||
|
@ -797,16 +811,19 @@ public abstract class FlowControlStrategyTest
|
|||
ByteBuffer extraData = ByteBuffer.allocate(1024);
|
||||
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
|
||||
List<ByteBuffer> buffers = lease.getByteBuffers();
|
||||
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()]));
|
||||
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0]));
|
||||
|
||||
// Expect the connection to be closed.
|
||||
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientExceedingStreamWindow() throws Exception
|
||||
{
|
||||
// On server, we don't consume the data.
|
||||
CountDownLatch serverCloseLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -829,16 +846,29 @@ public abstract class FlowControlStrategyTest
|
|||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
final CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
Session session = newClient(new Session.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onClose(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverCloseLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientCloseLatch = new CountDownLatch(1);
|
||||
Session session = newClient(new Session.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
|
||||
closeLatch.countDown();
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientCloseLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -849,7 +879,7 @@ public abstract class FlowControlStrategyTest
|
|||
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
|
||||
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
|
||||
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
|
||||
{
|
||||
@Override
|
||||
|
@ -877,10 +907,12 @@ public abstract class FlowControlStrategyTest
|
|||
ByteBuffer extraData = ByteBuffer.allocate(1024);
|
||||
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
|
||||
List<ByteBuffer> buffers = lease.getByteBuffers();
|
||||
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()]));
|
||||
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0]));
|
||||
|
||||
// Expect the connection to be closed.
|
||||
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -917,7 +949,7 @@ public abstract class FlowControlStrategyTest
|
|||
MetaData.Request metaData = newRequest("POST", new HttpFields());
|
||||
HeadersFrame frame = new HeadersFrame(metaData, null, false);
|
||||
FuturePromise<Stream> streamPromise = new FuturePromise<>();
|
||||
final CountDownLatch resetLatch = new CountDownLatch(1);
|
||||
CountDownLatch resetLatch = new CountDownLatch(1);
|
||||
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
|
@ -930,7 +962,7 @@ public abstract class FlowControlStrategyTest
|
|||
|
||||
// Perform a big upload that will stall the flow control windows.
|
||||
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||
final CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
stream.data(new DataFrame(stream.getId(), data, true), new Callback()
|
||||
{
|
||||
@Override
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -84,8 +84,8 @@ public class SessionFailureTest extends AbstractTest
|
|||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
// Forcibly close the connection.
|
||||
((HTTP2Session)stream.getSession()).getEndPoint().close();
|
||||
// Forcibly shutdown the output to fail the write below.
|
||||
((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput();
|
||||
// Now try to write something: it should fail.
|
||||
stream.headers(frame, new Callback()
|
||||
{
|
||||
|
|
|
@ -321,7 +321,9 @@ public class StreamCloseTest extends AbstractTest
|
|||
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
||||
if ("GET".equals(request.getMethod()))
|
||||
{
|
||||
((HTTP2Session)stream.getSession()).getEndPoint().close();
|
||||
// Only shutdown the output, since closing the EndPoint causes a call to
|
||||
// stop() on different thread which tries to concurrently fail the stream.
|
||||
((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput();
|
||||
// Try to write something to force an error.
|
||||
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.NOOP);
|
||||
}
|
||||
|
|
|
@ -239,7 +239,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
|
|||
{
|
||||
Runnable task = pollTask();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Dequeued task {}", task);
|
||||
LOG.debug("Dequeued task {}", String.valueOf(task));
|
||||
if (task != null)
|
||||
return task;
|
||||
|
||||
|
|
|
@ -365,7 +365,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
// If the failure came from within the
|
||||
// flusher, we need to close the connection.
|
||||
if (closed == null)
|
||||
session.abort(x);
|
||||
session.onWriteFailure(x);
|
||||
}
|
||||
|
||||
void terminate(Throwable cause)
|
||||
|
@ -376,7 +376,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
closed = terminated;
|
||||
terminated = cause;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{}", closed != null ? "Terminated" : "Terminating");
|
||||
LOG.debug("{} {}", closed != null ? "Terminated" : "Terminating", this);
|
||||
}
|
||||
if (closed == null)
|
||||
iterate();
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -21,7 +21,6 @@ package org.eclipse.jetty.http2;
|
|||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.WritePendingException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -144,7 +143,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
localReset = true;
|
||||
failure = new EOFException("reset");
|
||||
}
|
||||
session.frames(this, Collections.singletonList(frame), callback);
|
||||
((HTTP2Session)session).reset(this, frame, callback);
|
||||
}
|
||||
|
||||
private boolean startWrite(Callback callback)
|
||||
|
@ -327,24 +326,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
|
||||
dataLength = length >= 0 ? length : Long.MIN_VALUE;
|
||||
}
|
||||
|
||||
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
|
||||
session.removeStream(this);
|
||||
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
private void onData(DataFrame frame, Callback callback)
|
||||
{
|
||||
if (getRecvWindow() < 0)
|
||||
{
|
||||
// It's a bad client, it does not deserve to be
|
||||
// treated gently by just resetting the stream.
|
||||
session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP);
|
||||
callback.failed(new IOException("stream_window_exceeded"));
|
||||
return;
|
||||
}
|
||||
|
||||
// SPEC: remotely closed streams must be replied with a reset.
|
||||
if (isRemotelyClosed())
|
||||
{
|
||||
|
@ -371,10 +357,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
}
|
||||
}
|
||||
|
||||
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
|
||||
session.removeStream(this);
|
||||
|
||||
boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
|
||||
notifyData(this, frame, callback);
|
||||
if (closed)
|
||||
session.removeStream(this);
|
||||
}
|
||||
|
||||
private void onReset(ResetFrame frame, Callback callback)
|
||||
|
@ -385,8 +371,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
failure = new EofException("reset");
|
||||
}
|
||||
close();
|
||||
session.removeStream(this);
|
||||
notifyReset(this, frame, callback);
|
||||
if (session.removeStream(this))
|
||||
notifyReset(this, frame, callback);
|
||||
}
|
||||
|
||||
private void onPush(PushPromiseFrame frame, Callback callback)
|
||||
|
@ -409,8 +395,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
failure = frame.getFailure();
|
||||
}
|
||||
close();
|
||||
session.removeStream(this);
|
||||
notifyFailure(this, frame, callback);
|
||||
if (session.removeStream(this))
|
||||
notifyFailure(this, frame, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,8 +44,9 @@ public interface ISession extends Session
|
|||
* <p>Removes the given {@code stream}.</p>
|
||||
*
|
||||
* @param stream the stream to remove
|
||||
* @return whether the stream was removed
|
||||
*/
|
||||
void removeStream(IStream stream);
|
||||
boolean removeStream(IStream stream);
|
||||
|
||||
/**
|
||||
* <p>Sends the given list of frames to create a new {@link Stream}.</p>
|
||||
|
|
|
@ -97,8 +97,6 @@ public interface Session
|
|||
/**
|
||||
* <p>Closes the session by sending a GOAWAY frame with the given error code
|
||||
* and payload.</p>
|
||||
* <p>The GOAWAY frame is sent only once; subsequent or concurrent attempts to
|
||||
* close the session will have no effect.</p>
|
||||
*
|
||||
* @param error the error code
|
||||
* @param payload an optional payload (may be null)
|
||||
|
@ -197,6 +195,16 @@ public interface Session
|
|||
*
|
||||
* @param session the session
|
||||
* @param frame the GOAWAY frame received
|
||||
*/
|
||||
default void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when a GOAWAY frame caused the session to be closed.</p>
|
||||
*
|
||||
* @param session the session
|
||||
* @param frame the GOAWAY frame that caused the session to be closed
|
||||
* @param callback the callback to notify of the GOAWAY processing
|
||||
*/
|
||||
default void onClose(Session session, GoAwayFrame frame, Callback callback)
|
||||
|
|
|
@ -20,30 +20,33 @@ package org.eclipse.jetty.http2.frames;
|
|||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.eclipse.jetty.http2.CloseState;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
|
||||
public class GoAwayFrame extends Frame
|
||||
{
|
||||
private final CloseState closeState;
|
||||
public static final GoAwayFrame GRACEFUL = new GoAwayFrame(Integer.MAX_VALUE, ErrorCode.NO_ERROR.code, new byte[]{'g', 'r', 'a', 'c', 'e', 'f', 'u', 'l'});
|
||||
|
||||
private final int lastStreamId;
|
||||
private final int error;
|
||||
private final byte[] payload;
|
||||
|
||||
public GoAwayFrame(int lastStreamId, int error, byte[] payload)
|
||||
{
|
||||
this(CloseState.REMOTELY_CLOSED, lastStreamId, error, payload);
|
||||
}
|
||||
|
||||
public GoAwayFrame(CloseState closeState, int lastStreamId, int error, byte[] payload)
|
||||
{
|
||||
super(FrameType.GO_AWAY);
|
||||
this.closeState = closeState;
|
||||
this.lastStreamId = lastStreamId;
|
||||
this.error = error;
|
||||
this.payload = payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether this GOAWAY frame is graceful, i.e. its {@code lastStreamId == Integer.MAX_VALUE}
|
||||
*/
|
||||
public boolean isGraceful()
|
||||
{
|
||||
// SPEC: section 6.8.
|
||||
return lastStreamId == Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
public int getLastStreamId()
|
||||
{
|
||||
return lastStreamId;
|
||||
|
@ -76,11 +79,10 @@ public class GoAwayFrame extends Frame
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s,%d/%s/%s/%s",
|
||||
return String.format("%s{%d/%s/%s}",
|
||||
super.toString(),
|
||||
lastStreamId,
|
||||
ErrorCode.toString(error, null),
|
||||
tryConvertPayload(),
|
||||
closeState);
|
||||
tryConvertPayload());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ public class AbstractTest
|
|||
{
|
||||
protected Server server;
|
||||
protected ServerConnector connector;
|
||||
protected HTTP2Client http2Client;
|
||||
protected HttpClient client;
|
||||
|
||||
protected void start(ServerSessionListener listener) throws Exception
|
||||
|
@ -63,12 +64,13 @@ public class AbstractTest
|
|||
server.addConnector(connector);
|
||||
}
|
||||
|
||||
protected void prepareClient() throws Exception
|
||||
protected void prepareClient()
|
||||
{
|
||||
client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client()), null);
|
||||
http2Client = new HTTP2Client();
|
||||
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client), null);
|
||||
QueuedThreadPool clientExecutor = new QueuedThreadPool();
|
||||
clientExecutor.setName("client");
|
||||
client.setExecutor(clientExecutor);
|
||||
this.client.setExecutor(clientExecutor);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
|
|
@ -220,7 +220,9 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
|
|||
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
||||
if (HttpMethod.HEAD.is(request.getMethod()))
|
||||
{
|
||||
stream.getSession().close(ErrorCode.REFUSED_STREAM_ERROR.code, null, Callback.NOOP);
|
||||
int error = ErrorCode.REFUSED_STREAM_ERROR.code;
|
||||
stream.reset(new ResetFrame(stream.getId(), error), Callback.NOOP);
|
||||
stream.getSession().close(error, null, Callback.NOOP);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
#org.eclipse.jetty.LEVEL=DEBUG
|
||||
#org.eclipse.jetty.client.LEVEL=DEBUG
|
||||
org.eclipse.jetty.http2.hpack.LEVEL=INFO
|
||||
#org.eclipse.jetty.http2.LEVEL=DEBUG
|
||||
org.eclipse.jetty.http2.hpack.LEVEL=INFO
|
||||
#org.eclipse.jetty.io.ssl.LEVEL=DEBUG
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Collections;
|
|||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.CloseState;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
|
@ -64,18 +65,12 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
|
|||
settings = Collections.emptyMap();
|
||||
SettingsFrame settingsFrame = new SettingsFrame(settings, false);
|
||||
|
||||
WindowUpdateFrame windowFrame = null;
|
||||
int sessionWindow = getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE;
|
||||
updateRecvWindow(sessionWindow);
|
||||
if (sessionWindow > 0)
|
||||
{
|
||||
updateRecvWindow(sessionWindow);
|
||||
windowFrame = new WindowUpdateFrame(0, sessionWindow);
|
||||
}
|
||||
|
||||
if (windowFrame == null)
|
||||
frames(null, Collections.singletonList(settingsFrame), Callback.NOOP);
|
||||
frames(null, Arrays.asList(settingsFrame, new WindowUpdateFrame(0, sessionWindow)), Callback.NOOP);
|
||||
else
|
||||
frames(null, Arrays.asList(settingsFrame, windowFrame), Callback.NOOP);
|
||||
frames(null, Collections.singletonList(settingsFrame), Callback.NOOP);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,8 +104,11 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
|
|||
{
|
||||
onStreamOpened(stream);
|
||||
stream.process(frame, Callback.NOOP);
|
||||
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
|
||||
Stream.Listener listener = notifyNewStream(stream, frame);
|
||||
stream.setListener(listener);
|
||||
if (closed)
|
||||
removeStream(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -129,7 +127,10 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
|
|||
if (stream != null)
|
||||
{
|
||||
stream.process(frame, Callback.NOOP);
|
||||
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
|
||||
notifyHeaders(stream, frame);
|
||||
if (closed)
|
||||
removeStream(stream);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -104,6 +104,12 @@ public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnecti
|
|||
delegate.onReset(session, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
delegate.onGoAway(session, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(Session session, GoAwayFrame frame)
|
||||
{
|
||||
|
|
|
@ -503,6 +503,6 @@ public abstract class IteratingCallback implements Callback
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s]", super.toString(), _state);
|
||||
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), _state);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue