Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-11-26 16:36:14 +01:00
commit c85cef4da5
22 changed files with 2252 additions and 744 deletions

View File

@ -94,20 +94,6 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
this.listener = listener;
}
@Override
public long getMessagesIn()
{
HTTP2ClientSession session = (HTTP2ClientSession)getSession();
return session.getStreamsOpened();
}
@Override
public long getMessagesOut()
{
HTTP2ClientSession session = (HTTP2ClientSession)getSession();
return session.getStreamsClosed();
}
@Override
public void onOpen()
{
@ -127,15 +113,11 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
ISession session = getSession();
int windowDelta = client.getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE;
session.updateRecvWindow(windowDelta);
if (windowDelta > 0)
{
session.updateRecvWindow(windowDelta);
session.frames(null, List.of(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this);
}
else
{
session.frames(null, List.of(prefaceFrame, settingsFrame), this);
}
}
@Override

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2.client;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.CloseState;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
@ -62,7 +63,10 @@ public class HTTP2ClientSession extends HTTP2Session
else
{
stream.process(frame, Callback.NOOP);
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyHeaders(stream, frame);
if (closed)
removeStream(stream);
}
}
else

View File

@ -134,7 +134,7 @@ public class AsyncServletTest extends AbstractTest
HeadersFrame frame = new HeadersFrame(metaData, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch resetLatch = new CountDownLatch(1);
CountDownLatch failLatch = new CountDownLatch(1);
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
@ -144,9 +144,10 @@ public class AsyncServletTest extends AbstractTest
}
@Override
public void onReset(Stream stream, ResetFrame frame)
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
{
resetLatch.countDown();
failLatch.countDown();
callback.succeeded();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
@ -154,7 +155,7 @@ public class AsyncServletTest extends AbstractTest
assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS));
assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(failLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test

View File

@ -137,10 +137,10 @@ public abstract class FlowControlStrategyTest
@Test
public void testWindowSizeUpdates() throws Exception
{
final CountDownLatch prefaceLatch = new CountDownLatch(1);
final CountDownLatch stream1Latch = new CountDownLatch(1);
final CountDownLatch stream2Latch = new CountDownLatch(1);
final CountDownLatch settingsLatch = new CountDownLatch(1);
CountDownLatch prefaceLatch = new CountDownLatch(1);
CountDownLatch stream1Latch = new CountDownLatch(1);
CountDownLatch stream2Latch = new CountDownLatch(1);
CountDownLatch settingsLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
@ -233,11 +233,11 @@ public abstract class FlowControlStrategyTest
// then we change the window to 512 B. At this point, the client
// must stop sending data (although the initial window allows it).
final int size = 512;
int size = 512;
// We get 3 data frames: the first of 1024 and 2 of 512 each
// after the flow control window has been reduced.
final CountDownLatch dataLatch = new CountDownLatch(3);
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
CountDownLatch dataLatch = new CountDownLatch(3);
AtomicReference<Callback> callbackRef = new AtomicReference<>();
start(new ServerSessionListener.Adapter()
{
@Override
@ -275,7 +275,7 @@ public abstract class FlowControlStrategyTest
});
// Two SETTINGS frames, the initial one and the one we send from the server.
final CountDownLatch settingsLatch = new CountDownLatch(2);
CountDownLatch settingsLatch = new CountDownLatch(2);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
@ -312,9 +312,9 @@ public abstract class FlowControlStrategyTest
@Test
public void testServerFlowControlOneBigWrite() throws Exception
{
final int windowSize = 1536;
final int length = 5 * windowSize;
final CountDownLatch settingsLatch = new CountDownLatch(2);
int windowSize = 1536;
int length = 5 * windowSize;
CountDownLatch settingsLatch = new CountDownLatch(2);
start(new ServerSessionListener.Adapter()
{
@Override
@ -349,13 +349,13 @@ public abstract class FlowControlStrategyTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
final CountDownLatch dataLatch = new CountDownLatch(1);
final Exchanger<Callback> exchanger = new Exchanger<>();
CountDownLatch dataLatch = new CountDownLatch(1);
Exchanger<Callback> exchanger = new Exchanger<>();
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
private AtomicInteger dataFrames = new AtomicInteger();
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
@ -406,10 +406,10 @@ public abstract class FlowControlStrategyTest
@Test
public void testClientFlowControlOneBigWrite() throws Exception
{
final int windowSize = 1536;
final Exchanger<Callback> exchanger = new Exchanger<>();
final CountDownLatch settingsLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
int windowSize = 1536;
Exchanger<Callback> exchanger = new Exchanger<>();
CountDownLatch settingsLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
@ -428,7 +428,7 @@ public abstract class FlowControlStrategyTest
stream.headers(responseFrame, Callback.NOOP);
return new Stream.Listener.Adapter()
{
private AtomicInteger dataFrames = new AtomicInteger();
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
@ -480,7 +480,7 @@ public abstract class FlowControlStrategyTest
session.newStream(requestFrame, streamPromise, null);
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
final int length = 5 * windowSize;
int length = 5 * windowSize;
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
stream.data(dataFrame, Callback.NOOP);
@ -499,7 +499,7 @@ public abstract class FlowControlStrategyTest
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
private void checkThatWeAreFlowControlStalled(Exchanger<Callback> exchanger) throws Exception
private void checkThatWeAreFlowControlStalled(Exchanger<Callback> exchanger)
{
assertThrows(TimeoutException.class,
() -> exchanger.exchange(null, 1, TimeUnit.SECONDS));
@ -508,7 +508,7 @@ public abstract class FlowControlStrategyTest
@Test
public void testSessionStalledStallsNewStreams() throws Exception
{
final int windowSize = 1024;
int windowSize = 1024;
start(new ServerSessionListener.Adapter()
{
@Override
@ -543,8 +543,8 @@ public abstract class FlowControlStrategyTest
Session session = newClient(new Session.Listener.Adapter());
// First request is just to consume most of the session window.
final List<Callback> callbacks1 = new ArrayList<>();
final CountDownLatch prepareLatch = new CountDownLatch(1);
List<Callback> callbacks1 = new ArrayList<>();
CountDownLatch prepareLatch = new CountDownLatch(1);
MetaData.Request request1 = newRequest("POST", HttpFields.EMPTY);
session.newStream(new HeadersFrame(request1, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@ -583,7 +583,7 @@ public abstract class FlowControlStrategyTest
});
// Fourth request is now stalled.
final CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
MetaData.Request request4 = newRequest("GET", HttpFields.EMPTY);
session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@ -612,7 +612,7 @@ public abstract class FlowControlStrategyTest
@Test
public void testServerSendsBigContent() throws Exception
{
final byte[] data = new byte[1024 * 1024];
byte[] data = new byte[1024 * 1024];
new Random().nextBytes(data);
start(new ServerSessionListener.Adapter()
@ -636,8 +636,8 @@ public abstract class FlowControlStrategyTest
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(metaData, null, true);
final byte[] bytes = new byte[data.length];
final CountDownLatch latch = new CountDownLatch(1);
byte[] bytes = new byte[data.length];
CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
private int received;
@ -681,7 +681,7 @@ public abstract class FlowControlStrategyTest
}
});
final int initialWindow = 16;
int initialWindow = 16;
Session session = newClient(new Session.Listener.Adapter()
{
@Override
@ -697,11 +697,11 @@ public abstract class FlowControlStrategyTest
new Random().nextBytes(requestData);
byte[] responseData = new byte[requestData.length];
final ByteBuffer responseContent = ByteBuffer.wrap(responseData);
ByteBuffer responseContent = ByteBuffer.wrap(responseData);
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
Promise.Completable<Stream> completable = new Promise.Completable<>();
final CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, completable, new Stream.Listener.Adapter()
{
@Override
@ -730,6 +730,7 @@ public abstract class FlowControlStrategyTest
public void testClientExceedingSessionWindow() throws Exception
{
// On server, we don't consume the data.
CountDownLatch serverCloseLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
@ -744,16 +745,29 @@ public abstract class FlowControlStrategyTest
}
};
}
});
final CountDownLatch closeLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
serverCloseLatch.countDown();
}
});
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientCloseLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
closeLatch.countDown();
clientGoAwayLatch.countDown();
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
clientCloseLatch.countDown();
}
});
@ -764,7 +778,7 @@ public abstract class FlowControlStrategyTest
session.newStream(requestFrame, Promise.from(completable), new Stream.Listener.Adapter());
Stream stream = completable.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
@ -796,16 +810,19 @@ public abstract class FlowControlStrategyTest
ByteBuffer extraData = ByteBuffer.allocate(1024);
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
List<ByteBuffer> buffers = lease.getByteBuffers();
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()]));
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0]));
// Expect the connection to be closed.
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testClientExceedingStreamWindow() throws Exception
{
// On server, we don't consume the data.
CountDownLatch serverCloseLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
@ -828,16 +845,29 @@ public abstract class FlowControlStrategyTest
}
};
}
});
final CountDownLatch closeLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onClose(Session session, GoAwayFrame frame)
{
serverCloseLatch.countDown();
}
});
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientCloseLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code)
closeLatch.countDown();
clientGoAwayLatch.countDown();
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
clientCloseLatch.countDown();
}
});
@ -848,7 +878,7 @@ public abstract class FlowControlStrategyTest
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
@ -876,10 +906,12 @@ public abstract class FlowControlStrategyTest
ByteBuffer extraData = ByteBuffer.allocate(1024);
http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining());
List<ByteBuffer> buffers = lease.getByteBuffers();
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()]));
http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0]));
// Expect the connection to be closed.
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
}
@Test
@ -916,7 +948,7 @@ public abstract class FlowControlStrategyTest
MetaData.Request metaData = newRequest("POST", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
final CountDownLatch resetLatch = new CountDownLatch(1);
CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
@ -929,7 +961,7 @@ public abstract class FlowControlStrategyTest
// Perform a big upload that will stall the flow control windows.
ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, true), new Callback()
{
@Override

View File

@ -930,9 +930,20 @@ public class HTTP2Test extends AbstractTest
// Avoid aggressive idle timeout to allow the test verifications.
connector.setShutdownIdleTimeout(connector.getIdleTimeout());
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientCloseLatch = new CountDownLatch(1);
Session clientSession = newClient(new Session.Listener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
if (frame.isGraceful())
clientGracefulGoAwayLatch.countDown();
else
clientGoAwayLatch.countDown();
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{
@ -977,26 +988,20 @@ public class HTTP2Test extends AbstractTest
int port = connector.getLocalPort();
CompletableFuture<Void> shutdown = Graceful.shutdown(server);
// GOAWAY should not arrive to the client yet.
assertFalse(clientCloseLatch.await(1, TimeUnit.SECONDS));
// Client should receive the graceful GOAWAY.
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
// Client should not receive the non-graceful GOAWAY.
assertFalse(clientGoAwayLatch.await(500, TimeUnit.MILLISECONDS));
// Client should not be closed yet.
assertFalse(clientCloseLatch.await(500, TimeUnit.MILLISECONDS));
// New requests should be immediately rejected.
// Client cannot create new requests after receiving a GOAWAY.
HostPortHttpField authority3 = new HostPortHttpField("localhost" + ":" + port);
MetaData.Request metaData3 = new MetaData.Request("GET", HttpScheme.HTTP.asString(), authority3, servletPath, HttpVersion.HTTP_2, HttpFields.EMPTY, -1);
HeadersFrame request3 = new HeadersFrame(metaData3, null, false);
HeadersFrame request3 = new HeadersFrame(metaData3, null, true);
FuturePromise<Stream> promise3 = new FuturePromise<>();
CountDownLatch resetLatch = new CountDownLatch(1);
clientSession.newStream(request3, promise3, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream3 = promise3.get(5, TimeUnit.SECONDS);
stream3.data(new DataFrame(stream3.getId(), ByteBuffer.allocate(1), true), Callback.NOOP);
assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
clientSession.newStream(request3, promise3, new Stream.Listener.Adapter());
assertThrows(ExecutionException.class, () -> promise3.get(5, TimeUnit.SECONDS));
// Finish the previous requests and expect the responses.
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
@ -1005,9 +1010,9 @@ public class HTTP2Test extends AbstractTest
assertNull(shutdown.get(5, TimeUnit.SECONDS));
// Now GOAWAY should arrive to the client.
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
// Wait to process the GOAWAY frames and close the EndPoints.
Thread.sleep(1000);
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertFalse(((HTTP2Session)serverSession).getEndPoint().isOpen());
}

View File

@ -84,8 +84,8 @@ public class SessionFailureTest extends AbstractTest
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
// Forcibly close the connection.
((HTTP2Session)stream.getSession()).getEndPoint().close();
// Forcibly shutdown the output to fail the write below.
((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput();
// Now try to write something: it should fail.
stream.headers(frame, new Callback()
{

View File

@ -321,7 +321,9 @@ public class StreamCloseTest extends AbstractTest
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("GET".equals(request.getMethod()))
{
((HTTP2Session)stream.getSession()).getEndPoint().close();
// Only shutdown the output, since closing the EndPoint causes a call to
// stop() on different thread which tries to concurrently fail the stream.
((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput();
// Try to write something to force an error.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.NOOP);
}

View File

@ -267,7 +267,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
{
Runnable task = pollTask();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued task {}", task);
LOG.debug("Dequeued task {}", String.valueOf(task));
if (task != null)
return task;

View File

@ -367,7 +367,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
// If the failure came from within the
// flusher, we need to close the connection.
if (closed == null)
session.abort(x);
session.onWriteFailure(x);
}
void terminate(Throwable cause)
@ -378,7 +378,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
closed = terminated;
terminated = cause;
if (LOG.isDebugEnabled())
LOG.debug("{}", closed != null ? "Terminated" : "Terminating");
LOG.debug("{} {}", closed != null ? "Terminated" : "Terminating", this);
}
if (closed == null)
iterate();

View File

@ -158,7 +158,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
localReset = true;
failure = new EOFException("reset");
}
session.frames(this, List.of(frame), callback);
((HTTP2Session)session).reset(this, frame, callback);
}
private boolean startWrite(Callback callback)
@ -367,24 +367,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
length = fields.getLongField(HttpHeader.CONTENT_LENGTH);
dataLength = length >= 0 ? length : Long.MIN_VALUE;
}
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
callback.succeeded();
}
private void onData(DataFrame frame, Callback callback)
{
if (getRecvWindow() < 0)
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
((HTTP2Session)session).onConnectionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded");
callback.failed(new IOException("stream_window_exceeded"));
return;
}
// SPEC: remotely closed streams must be replied with a reset.
if (isRemotelyClosed())
{
@ -483,9 +470,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
dataEntry = dataQueue.poll();
}
DataFrame frame = dataEntry.frame;
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyDataDemanded(this, frame, dataEntry.callback);
if (closed)
session.removeStream(this);
}
}
@ -505,8 +493,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
failure = new EofException("reset");
}
close();
session.removeStream(this);
notifyReset(this, frame, callback);
if (session.removeStream(this))
notifyReset(this, frame, callback);
}
private void onPush(PushPromiseFrame frame, Callback callback)
@ -529,8 +517,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
failure = frame.getFailure();
}
close();
session.removeStream(this);
notifyFailure(this, frame, callback);
if (session.removeStream(this))
notifyFailure(this, frame, callback);
}
@Override

View File

@ -45,8 +45,9 @@ public interface ISession extends Session
* <p>Removes the given {@code stream}.</p>
*
* @param stream the stream to remove
* @return whether the stream was removed
*/
public void removeStream(IStream stream);
public boolean removeStream(IStream stream);
/**
* <p>Sends the given list of frames to create a new {@link Stream}.</p>

View File

@ -113,8 +113,6 @@ public interface Session
/**
* <p>Closes the session by sending a GOAWAY frame with the given error code
* and payload.</p>
* <p>The GOAWAY frame is sent only once; subsequent or concurrent attempts to
* close the session will have no effect.</p>
*
* @param error the error code
* @param payload an optional payload (may be null)
@ -225,6 +223,16 @@ public interface Session
*
* @param session the session
* @param frame the GOAWAY frame received
*/
default void onGoAway(Session session, GoAwayFrame frame)
{
}
/**
* <p>Callback method invoked when a GOAWAY frame caused the session to be closed.</p>
*
* @param session the session
* @param frame the GOAWAY frame that caused the session to be closed
* @param callback the callback to notify of the GOAWAY processing
*/
public default void onClose(Session session, GoAwayFrame frame, Callback callback)

View File

@ -20,30 +20,33 @@ package org.eclipse.jetty.http2.frames;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.http2.CloseState;
import org.eclipse.jetty.http2.ErrorCode;
public class GoAwayFrame extends Frame
{
private final CloseState closeState;
public static final GoAwayFrame GRACEFUL = new GoAwayFrame(Integer.MAX_VALUE, ErrorCode.NO_ERROR.code, new byte[]{'g', 'r', 'a', 'c', 'e', 'f', 'u', 'l'});
private final int lastStreamId;
private final int error;
private final byte[] payload;
public GoAwayFrame(int lastStreamId, int error, byte[] payload)
{
this(CloseState.REMOTELY_CLOSED, lastStreamId, error, payload);
}
public GoAwayFrame(CloseState closeState, int lastStreamId, int error, byte[] payload)
{
super(FrameType.GO_AWAY);
this.closeState = closeState;
this.lastStreamId = lastStreamId;
this.error = error;
this.payload = payload;
}
/**
* @return whether this GOAWAY frame is graceful, i.e. its {@code lastStreamId == Integer.MAX_VALUE}
*/
public boolean isGraceful()
{
// SPEC: section 6.8.
return lastStreamId == Integer.MAX_VALUE;
}
public int getLastStreamId()
{
return lastStreamId;
@ -76,11 +79,10 @@ public class GoAwayFrame extends Frame
@Override
public String toString()
{
return String.format("%s,%d/%s/%s/%s",
return String.format("%s{%d/%s/%s}",
super.toString(),
lastStreamId,
ErrorCode.toString(error, null),
tryConvertPayload(),
closeState);
tryConvertPayload());
}
}

View File

@ -40,13 +40,13 @@ import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.CloseState;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -99,29 +99,43 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
public void upgrade(Map<String, Object> context)
{
HttpResponse response = (HttpResponse)context.get(HttpResponse.class.getName());
HttpRequest request = (HttpRequest)response.getRequest();
// In case of HTTP/1.1 upgrade to HTTP/2, the request is HTTP/1.1
// (with upgrade) for a resource, and the response is HTTP/2.
// Create the implicit stream#1 so that it can receive the HTTP/2 response.
MetaData.Request metaData = new MetaData.Request(request.getMethod(), HttpURI.from(request.getURI()), HttpVersion.HTTP_2, request.getHeaders());
// We do not support upgrade requests with content, so endStream=true.
HeadersFrame frame = new HeadersFrame(metaData, null, true);
IStream stream = ((HTTP2Session)session).newLocalStream(frame, null);
stream.updateClose(frame.isEndStream(), CloseState.Event.AFTER_SEND);
HttpResponse response = (HttpResponse)context.get(HttpResponse.class.getName());
HttpRequest request = (HttpRequest)response.getRequest();
HttpExchange exchange = request.getConversation().getExchanges().peekLast();
HttpChannelOverHTTP2 http2Channel = acquireHttpChannel();
activeChannels.add(http2Channel);
HttpExchange newExchange = new HttpExchange(exchange.getHttpDestination(), exchange.getRequest(), List.of());
http2Channel.associate(newExchange);
stream.setListener(http2Channel.getStreamListener());
http2Channel.setStream(stream);
newExchange.requestComplete(null);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade completed for {}", this);
// Create the implicit stream#1 so that it can receive the HTTP/2 response.
MetaData.Request metaData = new MetaData.Request(request.getMethod(), HttpURI.from(request.getURI()), HttpVersion.HTTP_2, request.getHeaders());
// We do not support upgrade requests with content, so endStream=true.
HeadersFrame frame = new HeadersFrame(metaData, null, true);
((HTTP2Session)session).newUpgradeStream(frame, http2Channel.getStreamListener(), new Promise<>()
{
@Override
public void succeeded(Stream stream)
{
http2Channel.setStream(stream);
newExchange.requestComplete(null);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade succeeded for {}", HttpConnectionOverHTTP2.this);
}
@Override
public void failed(Throwable failure)
{
newExchange.requestComplete(failure);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade failed for {}", HttpConnectionOverHTTP2.this);
}
});
}
@Override

View File

@ -35,6 +35,7 @@ public class AbstractTest
{
protected Server server;
protected ServerConnector connector;
protected HTTP2Client http2Client;
protected HttpClient client;
protected void start(ServerSessionListener listener) throws Exception
@ -63,12 +64,13 @@ public class AbstractTest
server.addConnector(connector);
}
protected void prepareClient() throws Exception
protected void prepareClient()
{
client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client()));
http2Client = new HTTP2Client();
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client));
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
client.setExecutor(clientExecutor);
this.client.setExecutor(clientExecutor);
}
@AfterEach

View File

@ -226,7 +226,9 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if (HttpMethod.HEAD.is(request.getMethod()))
{
stream.getSession().close(ErrorCode.REFUSED_STREAM_ERROR.code, null, Callback.NOOP);
int error = ErrorCode.REFUSED_STREAM_ERROR.code;
stream.reset(new ResetFrame(stream.getId(), error), Callback.NOOP);
stream.getSession().close(error, null, Callback.NOOP);
}
else
{

View File

@ -1,6 +1,6 @@
# Jetty Logging using jetty-slf4j-impl
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.io.ssl.LEVEL=DEBUG

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.CloseState;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
@ -65,18 +66,12 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
settings = Collections.emptyMap();
SettingsFrame settingsFrame = new SettingsFrame(settings, false);
WindowUpdateFrame windowFrame = null;
int sessionWindow = getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE;
updateRecvWindow(sessionWindow);
if (sessionWindow > 0)
{
updateRecvWindow(sessionWindow);
windowFrame = new WindowUpdateFrame(0, sessionWindow);
}
if (windowFrame == null)
frames(null, List.of(settingsFrame), Callback.NOOP);
frames(null, List.of(settingsFrame, new WindowUpdateFrame(0, sessionWindow)), Callback.NOOP);
else
frames(null, List.of(settingsFrame, windowFrame), Callback.NOOP);
frames(null, List.of(settingsFrame), Callback.NOOP);
}
@Override
@ -105,31 +100,26 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
}
else
{
if (isClosed())
stream = createRemoteStream(streamId, (MetaData.Request)metaData);
if (stream != null)
{
updateLastRemoteStreamId(streamId);
reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
}
else
{
stream = createRemoteStream(streamId, (MetaData.Request)metaData);
if (stream != null)
onStreamOpened(stream);
if (metaData instanceof MetaData.ConnectRequest)
{
onStreamOpened(stream);
if (metaData instanceof MetaData.ConnectRequest)
if (!isConnectProtocolEnabled() && ((MetaData.ConnectRequest)metaData).getProtocol() != null)
{
if (!isConnectProtocolEnabled() && ((MetaData.ConnectRequest)metaData).getProtocol() != null)
{
stream.reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP);
return;
}
stream.reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP);
return;
}
stream.process(frame, Callback.NOOP);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
}
stream.process(frame, Callback.NOOP);
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
Stream.Listener listener = notifyNewStream(stream, frame);
stream.setListener(listener);
if (closed)
removeStream(stream);
}
}
}
@ -148,7 +138,10 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
if (stream != null)
{
stream.process(frame, Callback.NOOP);
boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED);
notifyHeaders(stream, frame);
if (closed)
removeStream(stream);
}
else
{

View File

@ -109,6 +109,12 @@ public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnecti
delegate.onReset(session, frame);
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
delegate.onGoAway(session, frame);
}
@Override
public void onClose(Session session, GoAwayFrame frame)
{

View File

@ -492,6 +492,6 @@ public abstract class IteratingCallback implements Callback
@Override
public String toString()
{
return String.format("%s[%s]", super.toString(), _state);
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), _state);
}
}