Fixes #2037 - HTTP/2 stream reset leaves stream frames in the flusher.
Now waking up the flusher via iterate() after a reset has been received. This ensures that frames that may have stalled are removed from the flusher queue. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
6b7f906f9d
commit
d88e2b767f
|
@ -46,6 +46,8 @@ import org.eclipse.jetty.http.HttpVersion;
|
|||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.HTTP2Flusher;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
import org.eclipse.jetty.http2.IStream;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
|
@ -54,6 +56,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
|||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.server.HttpChannel;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
|
@ -631,6 +634,55 @@ public class StreamResetTest extends AbstractTest
|
|||
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResetAfterBlockingWrite() throws Exception
|
||||
{
|
||||
int windowSize = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
|
||||
CountDownLatch writeLatch = new CountDownLatch(1);
|
||||
start(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
ServletOutputStream output = response.getOutputStream();
|
||||
output.write(new byte[10 * windowSize]);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
writeLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
AtomicLong received = new AtomicLong();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Session client = newClient(new Session.Listener.Adapter());
|
||||
MetaData.Request request = newRequest("GET", new HttpFields());
|
||||
HeadersFrame frame = new HeadersFrame(request, null, true);
|
||||
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||
client.newStream(frame, promise, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
if (received.addAndGet(frame.getData().remaining()) == windowSize)
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Reset.
|
||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
HTTP2Session session = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class).getBean(HTTP2Session.class);
|
||||
HTTP2Flusher flusher = session.getBean(HTTP2Flusher.class);
|
||||
Assert.assertEquals(0, flusher.getFrameQueueSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResetAfterAsyncRequestAsyncWriteStalledByFlowControl() throws Exception
|
||||
{
|
||||
|
|
|
@ -116,7 +116,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
private int getFrameQueueSize()
|
||||
public int getFrameQueueSize()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
|
@ -141,15 +141,12 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
entry.perform();
|
||||
}
|
||||
|
||||
if (!frames.isEmpty())
|
||||
for (Entry entry : frames)
|
||||
{
|
||||
for (Entry entry : frames)
|
||||
{
|
||||
entries.offer(entry);
|
||||
actives.add(entry);
|
||||
}
|
||||
frames.clear();
|
||||
entries.offer(entry);
|
||||
actives.add(entry);
|
||||
}
|
||||
frames.clear();
|
||||
}
|
||||
|
||||
|
||||
|
@ -166,11 +163,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Processing {}", entry);
|
||||
|
||||
// If the stream has been reset, don't send the frame.
|
||||
if (entry.reset())
|
||||
// If the stream has been reset or removed, don't send the frame.
|
||||
if (entry.isStale())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Resetting {}", entry);
|
||||
LOG.debug("Stale {}", entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -328,7 +325,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
{
|
||||
protected final Frame frame;
|
||||
protected final IStream stream;
|
||||
private boolean reset;
|
||||
|
||||
protected Entry(Frame frame, IStream stream, Callback callback)
|
||||
{
|
||||
|
@ -346,7 +342,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
|
||||
private void complete()
|
||||
{
|
||||
if (reset)
|
||||
if (!isProtocol() && stream != null && stream.isReset())
|
||||
failed(new EofException("reset"));
|
||||
else
|
||||
succeeded();
|
||||
|
@ -363,23 +359,31 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
super.failed(x);
|
||||
}
|
||||
|
||||
private boolean reset()
|
||||
private boolean isStale()
|
||||
{
|
||||
return this.reset = stream != null && stream.isReset() && !isProtocol();
|
||||
return !isProtocol() && stream != null && (stream.isReset() || stream.getSession().getStream(stream.getId()) == null);
|
||||
}
|
||||
|
||||
private boolean isProtocol()
|
||||
{
|
||||
switch (frame.getType())
|
||||
{
|
||||
case DATA:
|
||||
case HEADERS:
|
||||
case PUSH_PROMISE:
|
||||
case CONTINUATION:
|
||||
return false;
|
||||
case PRIORITY:
|
||||
case RST_STREAM:
|
||||
case SETTINGS:
|
||||
case PING:
|
||||
case GO_AWAY:
|
||||
case WINDOW_UPDATE:
|
||||
case PREFACE:
|
||||
case DISCONNECT:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -282,7 +282,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
|
||||
IStream stream = getStream(frame.getStreamId());
|
||||
if (stream != null)
|
||||
stream.process(frame, Callback.NOOP);
|
||||
stream.process(frame, new ResetCallback());
|
||||
else
|
||||
notifyReset(this, frame);
|
||||
}
|
||||
|
@ -753,8 +753,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
IStream removed = streams.remove(stream.getId());
|
||||
if (removed != null)
|
||||
{
|
||||
assert removed == stream;
|
||||
|
||||
boolean local = stream.isLocal();
|
||||
if (local)
|
||||
localStreamCount.decrementAndGet();
|
||||
|
@ -1329,6 +1327,32 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
}
|
||||
}
|
||||
|
||||
private class ResetCallback implements Callback
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InvocationType getInvocationType()
|
||||
{
|
||||
return InvocationType.NON_BLOCKING;
|
||||
}
|
||||
|
||||
private void complete()
|
||||
{
|
||||
flusher.iterate();
|
||||
}
|
||||
}
|
||||
|
||||
private class CloseCallback implements Callback
|
||||
{
|
||||
private final int error;
|
||||
|
|
|
@ -88,9 +88,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
@Override
|
||||
public void headers(HeadersFrame frame, Callback callback)
|
||||
{
|
||||
if (!startWrite(callback))
|
||||
return;
|
||||
session.frames(this, this, frame, Frame.EMPTY_ARRAY);
|
||||
if (startWrite(callback))
|
||||
session.frames(this, this, frame, Frame.EMPTY_ARRAY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,9 +101,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
@Override
|
||||
public void data(DataFrame frame, Callback callback)
|
||||
{
|
||||
if (!startWrite(callback))
|
||||
return;
|
||||
session.data(this, this, frame);
|
||||
if (startWrite(callback))
|
||||
session.data(this, this, frame);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -292,8 +290,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
remoteReset = true;
|
||||
close();
|
||||
session.removeStream(this);
|
||||
callback.succeeded();
|
||||
notifyReset(this, frame);
|
||||
notifyReset(this, frame, callback);
|
||||
}
|
||||
|
||||
private void onPush(PushPromiseFrame frame, Callback callback)
|
||||
|
@ -377,8 +374,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
@Override
|
||||
public void close()
|
||||
{
|
||||
closeState.set(CloseState.CLOSED);
|
||||
onClose();
|
||||
if (closeState.getAndSet(CloseState.CLOSED) != CloseState.CLOSED)
|
||||
onClose();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -417,14 +414,14 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
}
|
||||
}
|
||||
|
||||
private void notifyReset(Stream stream, ResetFrame frame)
|
||||
private void notifyReset(Stream stream, ResetFrame frame, Callback callback)
|
||||
{
|
||||
final Listener listener = this.listener;
|
||||
if (listener == null)
|
||||
return;
|
||||
try
|
||||
{
|
||||
listener.onReset(stream, frame);
|
||||
listener.onReset(stream, frame, callback);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
|
|
@ -163,6 +163,19 @@ public interface Stream
|
|||
*/
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback);
|
||||
|
||||
public default void onReset(Stream stream, ResetFrame frame, Callback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
onReset(stream, frame);
|
||||
callback.succeeded();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p>
|
||||
*
|
||||
|
@ -170,7 +183,9 @@ public interface Stream
|
|||
* @param frame the RST_FRAME received
|
||||
* @see Session.Listener#onReset(Session, ResetFrame)
|
||||
*/
|
||||
public void onReset(Stream stream, ResetFrame frame);
|
||||
public default void onReset(Stream stream, ResetFrame frame)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when the stream exceeds its idle timeout.</p>
|
||||
|
|
|
@ -163,12 +163,12 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onReset(Stream stream, ResetFrame frame)
|
||||
public void onReset(Stream stream, ResetFrame frame, Callback callback)
|
||||
{
|
||||
ErrorCode error = ErrorCode.from(frame.getError());
|
||||
if (error == null)
|
||||
error = ErrorCode.CANCEL_STREAM_ERROR;
|
||||
getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), Callback.NOOP);
|
||||
getConnection().onStreamFailure((IStream)stream, new EofException("HTTP/2 " + error), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue