Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-06-18 15:37:09 +02:00
commit 0b854a1f5f
9 changed files with 98 additions and 39 deletions

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -118,17 +117,6 @@ public class HTTP2ClientSession extends HTTP2Session
} }
} }
@Override
protected void onResetForUnknownStream(ResetFrame frame)
{
int streamId = frame.getStreamId();
boolean closed = isClientStream(streamId) ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId);
if (closed)
notifyReset(this, frame);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame");
}
@Override @Override
public void onPushPromise(PushPromiseFrame frame) public void onPushPromise(PushPromiseFrame frame)
{ {

View File

@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener; import javax.servlet.WriteListener;
@ -51,6 +52,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Flusher; import org.eclipse.jetty.http2.HTTP2Flusher;
@ -1042,6 +1044,64 @@ public class StreamResetTest extends AbstractTest
} }
} }
@Test
public void testResetBeforeReceivingWindowUpdate() throws Exception
{
int window = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
float ratio = 0.5F;
AtomicReference<Stream> streamRef = new AtomicReference<>();
Consumer<AbstractHTTP2ServerConnectionFactory> http2Factory = http2 ->
{
http2.setInitialSessionRecvWindow(window);
http2.setInitialStreamRecvWindow(window);
http2.setFlowControlStrategyFactory(() -> new BufferingFlowControlStrategy(ratio)
{
@Override
protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
{
// Before sending the window update, reset from the client side.
if (stream != null)
streamRef.get().reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
super.sendWindowUpdate(stream, session, frame);
}
});
};
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
Callback.Completable completable = new Callback.Completable();
stream.headers(responseFrame, completable);
// Consume the request content as it arrives.
return new Stream.Listener.Adapter();
}
}, http2Factory);
CountDownLatch failureLatch = new CountDownLatch(1);
Session client = newClient(new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
failureLatch.countDown();
}
});
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(requestFrame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
streamRef.set(stream);
// Send enough bytes to trigger the server to send a window update.
ByteBuffer content = ByteBuffer.allocate((int)(window * ratio) + 1024);
stream.data(new DataFrame(stream.getId(), content, false), Callback.NOOP);
assertFalse(failureLatch.await(1, TimeUnit.SECONDS));
}
private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException
{ {
long start = System.nanoTime(); long start = System.nanoTime();

View File

@ -199,18 +199,22 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
protected void onSessionUnstalled(ISession session) protected void onSessionUnstalled(ISession session)
{ {
sessionStallTime.addAndGet(System.nanoTime() - sessionStall.getAndSet(0)); long stallTime = System.nanoTime() - sessionStall.getAndSet(0);
sessionStallTime.addAndGet(stallTime);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Session unstalled {}", session); LOG.debug("Session unstalled after {} ms {}", TimeUnit.NANOSECONDS.toMillis(stallTime), session);
} }
protected void onStreamUnstalled(IStream stream) protected void onStreamUnstalled(IStream stream)
{ {
Long time = streamsStalls.remove(stream); Long time = streamsStalls.remove(stream);
if (time != null) if (time != null)
streamsStallTime.addAndGet(System.nanoTime() - time); {
if (LOG.isDebugEnabled()) long stallTime = System.nanoTime() - time;
LOG.debug("Stream unstalled {}", stream); streamsStallTime.addAndGet(stallTime);
if (LOG.isDebugEnabled())
LOG.debug("Stream unstalled after {} ms {}", TimeUnit.NANOSECONDS.toMillis(stallTime), stream);
}
} }
@ManagedAttribute(value = "The time, in milliseconds, that the session flow control has stalled", readonly = true) @ManagedAttribute(value = "The time, in milliseconds, that the session flow control has stalled", readonly = true)

View File

@ -123,7 +123,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
session.updateRecvWindow(level); session.updateRecvWindow(level);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, updated session recv window by {}/{} for {}", length, level, maxLevel, session); LOG.debug("Data consumed, {} bytes, updated session recv window by {}/{} for {}", length, level, maxLevel, session);
session.frames(null, Callback.NOOP, new WindowUpdateFrame(0, level), Frame.EMPTY_ARRAY); sendWindowUpdate(null, session, new WindowUpdateFrame(0, level));
} }
else else
{ {
@ -157,7 +157,7 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
stream.updateRecvWindow(level); stream.updateRecvWindow(level);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, updated stream recv window by {}/{} for {}", length, level, maxLevel, stream); LOG.debug("Data consumed, {} bytes, updated stream recv window by {}/{} for {}", length, level, maxLevel, stream);
session.frames(stream, Callback.NOOP, new WindowUpdateFrame(stream.getId(), level), Frame.EMPTY_ARRAY); sendWindowUpdate(stream, session, new WindowUpdateFrame(stream.getId(), level));
} }
else else
{ {
@ -169,6 +169,11 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
} }
} }
protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
{
session.frames(stream, Callback.NOOP, frame, Frame.EMPTY_ARRAY);
}
@Override @Override
public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame) public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{ {

View File

@ -263,15 +263,23 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// We must enlarge the session flow control window, // We must enlarge the session flow control window,
// otherwise other requests will be stalled. // otherwise other requests will be stalled.
flowControl.onDataConsumed(this, null, flowControlLength); flowControl.onDataConsumed(this, null, flowControlLength);
boolean local = (streamId & 1) == (localStreamIds.get() & 1); if (isStreamClosed(streamId))
boolean closed = local ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId);
if (closed)
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), callback); reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), callback);
else else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback); onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback);
} }
} }
private boolean isStreamClosed(int streamId)
{
return isLocalStream(streamId) ? isLocalStreamClosed(streamId) : isRemoteStreamClosed(streamId);
}
private boolean isLocalStream(int streamId)
{
return (streamId & 1) == (localStreamIds.get() & 1);
}
protected boolean isLocalStreamClosed(int streamId) protected boolean isLocalStreamClosed(int streamId)
{ {
return streamId <= localStreamIds.get(); return streamId <= localStreamIds.get();
@ -310,7 +318,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
} }
protected abstract void onResetForUnknownStream(ResetFrame frame); protected void onResetForUnknownStream(ResetFrame frame)
{
if (isStreamClosed(frame.getStreamId()))
notifyReset(this, frame);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame");
}
@Override @Override
public void onSettings(SettingsFrame frame) public void onSettings(SettingsFrame frame)
@ -481,7 +495,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
} }
else else
{ {
if (!isRemoteStreamClosed(streamId)) if (!isStreamClosed(streamId))
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_window_update_frame"); onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_window_update_frame");
} }
} }

View File

@ -794,10 +794,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}", return String.format("%s@%x#%d@%x{sendWindow=%s,recvWindow=%s,demand=%d,reset=%b/%b,%s,age=%d,attachment=%s}",
getClass().getSimpleName(), getClass().getSimpleName(),
hashCode(), hashCode(),
getId(), getId(),
session.hashCode(),
sendWindow, sendWindow,
recvWindow, recvWindow,
demand(), demand(),

View File

@ -157,17 +157,6 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis
} }
} }
@Override
protected void onResetForUnknownStream(ResetFrame frame)
{
int streamId = frame.getStreamId();
boolean closed = isClientStream(streamId) ? isRemoteStreamClosed(streamId) : isLocalStreamClosed(streamId);
if (closed)
notifyReset(this, frame);
else
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_rst_stream_frame");
}
@Override @Override
public void onPushPromise(PushPromiseFrame frame) public void onPushPromise(PushPromiseFrame frame)
{ {

View File

@ -65,7 +65,7 @@
<localRepoPath>${project.build.directory}/local-repo</localRepoPath> <localRepoPath>${project.build.directory}/local-repo</localRepoPath>
<settingsPath>src/it/settings.xml</settingsPath> <settingsPath>src/it/settings.xml</settingsPath>
<surefire.rerunFailingTestsCount>0</surefire.rerunFailingTestsCount> <surefire.rerunFailingTestsCount>0</surefire.rerunFailingTestsCount>
<testcontainers.version>1.14.2</testcontainers.version> <testcontainers.version>1.14.3</testcontainers.version>
</properties> </properties>
<licenses> <licenses>

View File

@ -31,11 +31,9 @@ import java.sql.SQLException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.ClassLoadingObjectInputStream; import org.eclipse.jetty.util.ClassLoadingObjectInputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.output.Slf4jLogConsumer;